Commit 45f62a56 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

basic defination is added into both "export_collector_value" and "write_to_kafka" function

parent d2df9b75
Loading
Loading
Loading
Loading
+24 −11
Original line number Diff line number Diff line
@@ -16,16 +16,17 @@
import time
import logging
import requests
import threading
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer, KafkaException
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method

LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
ACTIVE_KAFKA_PRODUCERS = []     # list of active kafka producers   

class TelemetryBackendServiceImpl:
    """
@@ -51,15 +52,27 @@ class TelemetryBackendServiceImpl:
        self.fetch_interval    = fetch_interval
    
    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
    def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
        response        =  Tuple[str, str]
        response = ('test collector Id', 'test collected value')      # Metric to be fetched from endpoint based on Collector message
        collector_id    = str('test collector Id')
        collected_Value = str('test collected value')        # Metric to be fetched from endpoint based on Collector message
        response        = (collector_id, collected_Value)      
        return response

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
    def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
        (collector_id, collector_value) = request
        response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})
        # _collector_id, _collector_id_value = request
        # write collector_id and collector_id value on the Kafka topic

        # get kafka bootstrap server and topic name
        # write to kafka topic
        
        return response

    def stop_producer(self, request: KafkaProducer) -> Empty:  # type: ignore
        # flush and close kafka producer object
        return Empty()

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
@@ -72,7 +85,7 @@ class TelemetryBackendServiceImpl:
        """
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(self.exporter_endpoint)
            response = requests.get(self.exporter_endpoint) # type: ignore
            if response.status_code == 200:
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text
@@ -148,7 +161,7 @@ class TelemetryBackendServiceImpl:
        admin_client = AdminClient(conf)
        self.create_topic_if_not_exists(admin_client)

        kafka_producer = Producer(conf)
        kafka_producer = KafkaProducer(conf)

        try:
            start_time = time.time()
@@ -161,11 +174,11 @@ class TelemetryBackendServiceImpl:
                    # print("Metrics produced to Kafka topic")

                # Check if the specified run duration has elapsed
                if time.time() - start_time >= self.run_duration:
                if time.time() - start_time >= self.run_duration: # type: ignore
                    break

                # waiting time until next fetch 
                time.sleep(self.fetch_interval)
                time.sleep(self.fetch_interval) # type: ignore
        except KeyboardInterrupt:
            print("Keyboard interrupt detected. Exiting...")
        finally: