Loading src/kpi_manager/service/KpiValueComposer.py +3 −18 Original line number Original line Diff line number Diff line Loading @@ -47,10 +47,6 @@ class KpiValueComposer: def compose_kpi(): def compose_kpi(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() # @staticmethod # def run_kafka_listener(): # threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() @staticmethod @staticmethod def kafka_listener(): def kafka_listener(): """ """ Loading @@ -71,15 +67,13 @@ class KpiValueComposer: continue continue try: try: new_event = receive_msg.value().decode('utf-8') new_event = receive_msg.value().decode('utf-8') # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.process_event_and_label_kpi(new_event) # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.extract_kpi_values(new_event) except Exception as e: except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") continue continue @staticmethod @staticmethod def extract_kpi_values(event): def process_event_and_label_kpi(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') lines = event.split('\n') # matching_rows = [] # matching_rows = [] Loading @@ -90,7 +84,7 @@ class KpiValueComposer: (kpi_name, kpi_value) = line.split(" ") (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) if kpi_descriptor is not None: if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) Loading Loading @@ -130,15 +124,6 @@ class KpiValueComposer: } } return kpi_dict return kpi_dict @staticmethod def delete_kpi_by_id(): col_name = "link_id" kpi_name = None kpiDBobj = Kpi_DB() row = kpiDBobj.delete_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Deleted Row: {:}".format(row)) @staticmethod @staticmethod def delivery_callback( err, msg): def delivery_callback( err, msg): """ """ Loading Loading
src/kpi_manager/service/KpiValueComposer.py +3 −18 Original line number Original line Diff line number Diff line Loading @@ -47,10 +47,6 @@ class KpiValueComposer: def compose_kpi(): def compose_kpi(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() # @staticmethod # def run_kafka_listener(): # threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() @staticmethod @staticmethod def kafka_listener(): def kafka_listener(): """ """ Loading @@ -71,15 +67,13 @@ class KpiValueComposer: continue continue try: try: new_event = receive_msg.value().decode('utf-8') new_event = receive_msg.value().decode('utf-8') # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.process_event_and_label_kpi(new_event) # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.extract_kpi_values(new_event) except Exception as e: except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") continue continue @staticmethod @staticmethod def extract_kpi_values(event): def process_event_and_label_kpi(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') lines = event.split('\n') # matching_rows = [] # matching_rows = [] Loading @@ -90,7 +84,7 @@ class KpiValueComposer: (kpi_name, kpi_value) = line.split(" ") (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) if kpi_descriptor is not None: if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) Loading Loading @@ -130,15 +124,6 @@ class KpiValueComposer: } } return kpi_dict return kpi_dict @staticmethod def delete_kpi_by_id(): col_name = "link_id" kpi_name = None kpiDBobj = Kpi_DB() row = kpiDBobj.delete_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Deleted Row: {:}".format(row)) @staticmethod @staticmethod def delivery_callback( err, msg): def delivery_callback( err, msg): """ """ Loading