diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py index 8274c9fd147a32c944cccf79c0e864f5a86cff31..9d703b233a008aac0df6870601d758d41d870cdc 100644 --- a/src/kpi_manager/service/KpiValueComposer.py +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -28,7 +28,7 @@ LOGGER = logging.getLogger(__name__) KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} + 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'kpi_composer', @@ -45,12 +45,12 @@ class KpiValueComposer: @staticmethod def compose_kpi(): - KpiValueComposer.run_kafka_listener() - - @staticmethod - def run_kafka_listener(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() + # @staticmethod + # def run_kafka_listener(): + # threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() + @staticmethod def kafka_listener(): """ @@ -91,17 +91,19 @@ class KpiValueComposer: if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) - kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db() + kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) producerObj = KafkaProducer(PRODUCER_CONFIG) producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback) producerObj.flush() + else: + print ("No matching of KPI ({:}) found in db".format(kpi_name)) except Exception as e: print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e)) @staticmethod - def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): + def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): # = KPIs_TO_SEARCH[0] is added for testing col_name = "kpi_description" kpiDBobj = Kpi_DB() row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 90394501876513a2151ecec61e41c4a992215e4f..ad0132e47b75c89698030d7146c5f53a0ed80145 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -34,7 +34,7 @@ KAFKA_SERVER_IP = '127.0.0.1:9092' # KAFKA_SERVER_IP = '10.152.183.175:30092' ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} + 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,}