From 3b8e35eea744878b51f8e3e8e7265c0088a1db23 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Mon, 3 Jun 2024 10:02:14 +0000
Subject: [PATCH] Modification made for tests (fetch_node_exporter_metrics and
 stream_node_export_metrics_to_raw_topic)

---
 .../service/TelemetryBackendService.py        | 61 ++++++++++++++-----
 .../backend/tests/testTelemetryBackend.py     | 39 +++++++-----
 2 files changed, 70 insertions(+), 30 deletions(-)

diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py
index f2e5ff3ac..2ce8ebf70 100755
--- a/src/telemetry/backend/service/TelemetryBackendService.py
+++ b/src/telemetry/backend/service/TelemetryBackendService.py
@@ -33,9 +33,11 @@ LOGGER             = logging.getLogger(__name__)
 METRICS_POOL       = MetricsPool('Telemetry', 'TelemetryBackend')
 KAFKA_SERVER_IP    = '127.0.0.1:9092'
 ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
-KAFKA_TOPICS       = {'request' : 'topic_request', 
-                      'response': 'topic_response'}
-EXPORTER_ENDPOINT  = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"
+KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
+                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labled'}
+EXPORTER_ENDPOINT  = "http://127.0.0.1:9100/metrics"
+PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}
+
 
 class TelemetryBackendService:
     """
@@ -122,15 +124,12 @@ class TelemetryBackendService:
         """
         Method to write response on Kafka topic
         """
-        producer_configs = {
-            'bootstrap.servers': KAFKA_SERVER_IP,
-        }
         # topic_response = "topic_response"
         msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
         msg_key    = collector_id
-        producerObj = KafkaProducer(producer_configs)
+        producerObj = KafkaProducer(PRODUCER_CONFIG)
         # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
-        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
+        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback)
         producerObj.flush()
 
     def terminate_collector_backend(self, collector_id):
@@ -161,7 +160,8 @@ class TelemetryBackendService:
                 return False
         return True
 
-    def delivery_callback(self, err, msg):
+    @staticmethod
+    def delivery_callback( err, msg):
         """
         Callback function to handle message delivery status.
         Args:
@@ -174,8 +174,8 @@ class TelemetryBackendService:
             print(f'Message delivered to topic {msg.topic()}')
 
 # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
-
-    def fetch_node_exporter_metrics(self):
+    @staticmethod
+    def fetch_single_node_exporter_metric():
         """
         Method to fetch metrics from Node Exporter.
         Returns:
@@ -184,24 +184,29 @@ class TelemetryBackendService:
         KPI = "node_network_receive_packets_total"
         try:
             response = requests.get(EXPORTER_ENDPOINT) # type: ignore
+            LOGGER.info("Request status {:}".format(response))
             if response.status_code == 200:
                 # print(f"Metrics fetched sucessfully...")
                 metrics = response.text
                 # Check if the desired metric is available in the response
                 if KPI in metrics:
-                    KPI_VALUE = self.extract_metric_value(metrics, KPI)
+                    KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
                     # Extract the metric value
                     if KPI_VALUE is not None:
-                        print(f"KPI value: {KPI_VALUE}")
+                        LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
+                        print(f"Extracted value of {KPI} is: {KPI_VALUE}")
                         return KPI_VALUE
             else:
-                print(f"Failed to fetch metrics. Status code: {response.status_code}")
+                LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
+                # print(f"Failed to fetch metrics. Status code: {response.status_code}")
                 return None
         except Exception as e:
-            print(f"Failed to fetch metrics: {str(e)}")
+            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
+            # print(f"Failed to fetch metrics: {str(e)}")
             return None
 
-    def extract_metric_value(self, metrics, metric_name):
+    @staticmethod
+    def extract_metric_value(metrics, metric_name):
         """
         Method to extract the value of a metric from the metrics string.
         Args:
@@ -220,4 +225,28 @@ class TelemetryBackendService:
             print(f"Metric '{metric_name}' not found in the metrics.")
             return None
 
+    @staticmethod
+    def stream_node_export_metrics_to_raw_topic():
+        try:
+            while True:
+                response = requests.get(EXPORTER_ENDPOINT)
+                LOGGER.info("Response Status {:} ".format(response))
+                try: 
+                    if response.status_code == 200:
+                        producerObj = KafkaProducer(PRODUCER_CONFIG)
+                        producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
+                        producerObj.flush()
+                        LOGGER.info("Produce to topic")
+                    else:
+                        LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
+                        print(f"Didn't received expected response. Status code: {response.status_code}")
+                        return None
+                    time.sleep(5)
+                except Exception as e:
+                    LOGGER.info("Failed to process response. Status code: {:}".format(e))
+                    return None
+        except Exception as e:
+            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
+            print(f"Failed to fetch metrics: {str(e)}")
+            return None
 # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py
index b8b29d04a..bc64c473e 100644
--- a/src/telemetry/backend/tests/testTelemetryBackend.py
+++ b/src/telemetry/backend/tests/testTelemetryBackend.py
@@ -15,6 +15,7 @@
 import sys
 print (sys.path)
 sys.path.append('/home/tfs/tfs-ctrl')
+import threading
 import logging
 from typing import Tuple
 from common.proto.context_pb2 import Empty
@@ -27,17 +28,27 @@ LOGGER = logging.getLogger(__name__)
 # Tests Implementation of Telemetry Backend
 ###########################
 
-def test_verify_kafka_topics():
-    LOGGER.info('test_verify_kafka_topics requesting')
-    TelemetryBackendServiceObj = TelemetryBackendService()
-    KafkaTopics = ['topic_request', 'topic_response']
-    response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
-    LOGGER.debug(str(response))
-    assert isinstance(response, bool)
-
-def test_run_kafka_listener():
-    LOGGER.info('test_receive_kafka_request requesting')
-    TelemetryBackendServiceObj = TelemetryBackendService()
-    response = TelemetryBackendServiceObj.run_kafka_listener()
-    LOGGER.debug(str(response))
-    assert isinstance(response, bool)
+# def test_verify_kafka_topics():
+#     LOGGER.info('test_verify_kafka_topics requesting')
+#     TelemetryBackendServiceObj = TelemetryBackendService()
+#     KafkaTopics = ['topic_request', 'topic_response']
+#     response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, bool)
+
+# def test_run_kafka_listener():
+#     LOGGER.info('test_receive_kafka_request requesting')
+#     TelemetryBackendServiceObj = TelemetryBackendService()
+#     response = TelemetryBackendServiceObj.run_kafka_listener()
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, bool)
+
+
+def test_fetch_node_exporter_metrics():
+    LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ')
+    TelemetryBackendService.fetch_single_node_exporter_metric()
+
+def test_stream_node_export_metrics_to_raw_topic():
+    LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
+    threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()
+
-- 
GitLab