diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py index 31da6c5db783320a48e02fb3e9222adab8f9b622..bb2b6ebf3494130bf86d4f5d415c643c35fb20e5 100644 --- a/src/kpi_manager/service/KpiValueComposer.py +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -47,10 +47,6 @@ class KpiValueComposer: def compose_kpi(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() - # @staticmethod - # def run_kafka_listener(): - # threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() - @staticmethod def kafka_listener(): """ @@ -71,15 +67,13 @@ class KpiValueComposer: continue try: new_event = receive_msg.value().decode('utf-8') - # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - KpiValueComposer.extract_kpi_values(new_event) + KpiValueComposer.process_event_and_label_kpi(new_event) except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") continue @staticmethod - def extract_kpi_values(event): + def process_event_and_label_kpi(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') # matching_rows = [] @@ -90,7 +84,7 @@ class KpiValueComposer: (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') - print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) + print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) @@ -130,15 +124,6 @@ class KpiValueComposer: } return kpi_dict - @staticmethod - def delete_kpi_by_id(): - col_name = "link_id" - kpi_name = None - kpiDBobj = Kpi_DB() - row = kpiDBobj.delete_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) - if row is not None: - LOGGER.info("Deleted Row: {:}".format(row)) - @staticmethod def delivery_callback( err, msg): """