From 45f62a560db3115e343fbd04cae7ac8617a8d184 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Thu, 9 May 2024 11:07:55 +0000
Subject: [PATCH] basic defination is added into both "export_collector_value"
 and "write_to_kafka" function

---
 .../service/TelemetryBackendServiceImpl.py    | 35 +++++++++++++------
 1 file changed, 24 insertions(+), 11 deletions(-)

diff --git a/src/telemetry/backend/service/TelemetryBackendServiceImpl.py b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py
index ea57f6167..abcc30baf 100755
--- a/src/telemetry/backend/service/TelemetryBackendServiceImpl.py
+++ b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py
@@ -16,16 +16,17 @@
 import time
 import logging
 import requests
-import threading
 from typing import Tuple
 from common.proto.context_pb2 import Empty
-from confluent_kafka import Producer, KafkaException
+from confluent_kafka import Producer as KafkaProducer
+from confluent_kafka import KafkaException
 from confluent_kafka.admin import AdminClient, NewTopic
 from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
 from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
 
 LOGGER = logging.getLogger(__name__)
 METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
+ACTIVE_KAFKA_PRODUCERS = []     # list of active kafka producers   
 
 class TelemetryBackendServiceImpl:
     """
@@ -51,17 +52,29 @@ class TelemetryBackendServiceImpl:
         self.fetch_interval    = fetch_interval
     
     # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
-    def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
-        response =  Tuple[str, str]
-        response = ('test collector Id', 'test collected value')      # Metric to be fetched from endpoint based on Collector message
+    def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
+        response        =  Tuple[str, str]
+        collector_id    = str('test collector Id')
+        collected_Value = str('test collected value')        # Metric to be fetched from endpoint based on Collector message
+        response        = (collector_id, collected_Value)      
         return response
 
     # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
-    def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
+    def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
+        (collector_id, collector_value) = request
+        response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})
         # _collector_id, _collector_id_value = request
         # write collector_id and collector_id value on the Kafka topic
-        return Empty()
+
+        # get kafka bootstrap server and topic name
+        # write to kafka topic
         
+        return response
+
+    def stop_producer(self, request: KafkaProducer) -> Empty:  # type: ignore
+        # flush and close kafka producer object
+        return Empty()
+
 # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
 
     def fetch_node_exporter_metrics(self):
@@ -72,7 +85,7 @@ class TelemetryBackendServiceImpl:
         """
         KPI = "node_network_receive_packets_total"
         try:
-            response = requests.get(self.exporter_endpoint)
+            response = requests.get(self.exporter_endpoint) # type: ignore
             if response.status_code == 200:
                 # print(f"Metrics fetched sucessfully...")
                 metrics = response.text
@@ -148,7 +161,7 @@ class TelemetryBackendServiceImpl:
         admin_client = AdminClient(conf)
         self.create_topic_if_not_exists(admin_client)
 
-        kafka_producer = Producer(conf)
+        kafka_producer = KafkaProducer(conf)
 
         try:
             start_time = time.time()
@@ -161,11 +174,11 @@ class TelemetryBackendServiceImpl:
                     # print("Metrics produced to Kafka topic")
 
                 # Check if the specified run duration has elapsed
-                if time.time() - start_time >= self.run_duration:
+                if time.time() - start_time >= self.run_duration: # type: ignore
                     break
 
                 # waiting time until next fetch 
-                time.sleep(self.fetch_interval)
+                time.sleep(self.fetch_interval) # type: ignore
         except KeyboardInterrupt:
             print("Keyboard interrupt detected. Exiting...")
         finally:
-- 
GitLab