diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index 8b5886a69667a58cb73f4a4df0e84060ab3710a5..4d3a1f216406841344d40712ea04ec82cedf04d0 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -57,4 +57,4 @@ message KpiAlarms { context.Timestamp end_timestamp = 2; kpi_manager.KpiId kpi_id = 3; map<string, bool> alarms = 4; -} \ No newline at end of file +} diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 6ab841238f446a2895cd163fab4b7eb05eaa3176..078fa5896d5fb5033833e0e2ef2248613ef80c18 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService): thread.join() print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: print ('Backend collector {:} not found'.format(collector_id)) @@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService): while not stop_event.is_set(): if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break self.ExtractKpiValue(collector_id, collector['kpi_id']) time.sleep(collector['interval']) + def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + """ + Method to write kpi Termination signat on RESPONSE Kafka topic + """ + producer = self.kafka_producer + kpi_value : Dict = { + "kpi_id" : kpi_id, + "kpi_value" : measured_kpi_value, + } + producer.produce( + KafkaTopic.RESPONSE.value, # TODO: to the topic ... + key = collector_id, + value = json.dumps(kpi_value), + callback = self.delivery_callback + ) + producer.flush() + def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. @@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { + "time_stamp": str(time.time()), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.RESPONSE.value, + KafkaTopic.VALUE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() - def GenerateRawMetric(self, metrics: Any): - """ - Method writes raw metrics on VALUE Kafka topic - """ - producer = self.kafka_producer - some_metric : Dict = { - "some_id" : metrics - } - producer.produce( - KafkaTopic.VALUE.value, - key = 'raw', - value = json.dumps(some_metric), - callback = self.delivery_callback - ) - producer.flush() - def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ - if err: print(f'Message delivery failed: {err}') - # else: print(f'Message delivered to topic {msg.topic()}') - -# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- -# @staticmethod -# def fetch_single_node_exporter_metric(): -# """ -# Method to fetch metrics from Node Exporter. -# Returns: -# str: Metrics fetched from Node Exporter. -# """ -# KPI = "node_network_receive_packets_total" -# try: -# response = requests.get(EXPORTER_ENDPOINT) # type: ignore -# LOGGER.info("Request status {:}".format(response)) -# if response.status_code == 200: -# # print(f"Metrics fetched sucessfully...") -# metrics = response.text -# # Check if the desired metric is available in the response -# if KPI in metrics: -# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) -# # Extract the metric value -# if KPI_VALUE is not None: -# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) -# print(f"Extracted value of {KPI} is: {KPI_VALUE}") -# return KPI_VALUE -# else: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) -# # print(f"Failed to fetch metrics. Status code: {response.status_code}") -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# # print(f"Failed to fetch metrics: {str(e)}") -# return None - -# @staticmethod -# def extract_metric_value(metrics, metric_name): -# """ -# Method to extract the value of a metric from the metrics string. -# Args: -# metrics (str): Metrics string fetched from Exporter. -# metric_name (str): Name of the metric to extract. -# Returns: -# float: Value of the extracted metric, or None if not found. -# """ -# try: -# # Find the metric line containing the desired metric name -# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) -# # Split the line to extract the metric value -# metric_value = float(metric_line.split()[1]) -# return metric_value -# except StopIteration: -# print(f"Metric '{metric_name}' not found in the metrics.") -# return None - -# @staticmethod -# def stream_node_export_metrics_to_raw_topic(): -# try: -# while True: -# response = requests.get(EXPORTER_ENDPOINT) -# # print("Response Status {:} ".format(response)) -# # LOGGER.info("Response Status {:} ".format(response)) -# try: -# if response.status_code == 200: -# producerObj = KafkaProducer(PRODUCER_CONFIG) -# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) -# producerObj.flush() -# LOGGER.info("Produce to topic") -# else: -# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) -# print(f"Didn't received expected response. Status code: {response.status_code}") -# return None -# time.sleep(15) -# except Exception as e: -# LOGGER.info("Failed to process response. Status code: {:}".format(e)) -# return None -# except Exception as e: -# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) -# print(f"Failed to fetch metrics: {str(e)}") -# return None -# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print(f'Message delivery failed: {err}') + else: + LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) + print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py index a2bbee540c3ce348ef52eceb0e776f48a68d94b1..665fa825e3ee31b2e92351d9c5855f627ce40fa1 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_TelemetryBackend.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import threading from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService @@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - response = TelemetryBackendServiceObj.RunRequestListener() + response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() LOGGER.debug(str(response)) - assert isinstance(response, bool) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index b73d9fa952ee42aeb7adb8f3c0b2e4a3ba7f3e09..746790bf68fd3d843850fc96db526dcefad59283 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): # ---------- Independent Method --------------- # Listener method is independent of any method (same lifetime as service) # continously listens for responses - def RunResponseListener(self): + def install_servicers(self): threading.Thread(target=self.ResponseListener).start() - return True def ResponseListener(self): """