Commit 8ce11c89 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

improvements in Telemetry backend and Frontend service functionalities.

parent 178b72d0
Loading
Loading
Loading
Loading
+39 −86
Original line number Diff line number Diff line
@@ -34,6 +34,9 @@ METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP    = '127.0.0.1:9092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
ACTIVE_COLLECTORS  = []
KAFKA_TOPICS       = {'request' : 'topic_request', 
                      'response': 'topic_response'}
EXPORTER_ENDPOINT  = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"

class TelemetryBackendService:
    """
@@ -43,7 +46,7 @@ class TelemetryBackendService:
    def __init__(self):
        LOGGER.info('Init TelemetryBackendService')
    
    def run_kafka_listener(self)->bool: # type: ignore
    def run_kafka_listener(self)->bool:
        threading.Thread(target=self.kafka_listener).start()
        return True        
    
@@ -56,15 +59,16 @@ class TelemetryBackendService:
            'group.id'          : 'backend',
            'auto.offset.reset' : 'latest'
        }
        topic_request = "topic_request"
        if (self.create_topic_if_not_exists([topic_request])):
        # topic_request = "topic_request"
        consumerObj = KafkaConsumer(conusmer_configs)
            consumerObj.subscribe([topic_request])
        # consumerObj.subscribe([topic_request])
        consumerObj.subscribe([KAFKA_TOPICS['request']])

        while True:
            receive_msg = consumerObj.poll(2.0)
            if receive_msg is None:
                    print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request)     # added for debugging purposes
                # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request)     # added for debugging purposes
                print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
@@ -77,7 +81,6 @@ class TelemetryBackendService:
            self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)



    def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
        threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start()

@@ -89,8 +92,10 @@ class TelemetryBackendService:
        start_time = time.time()
        while True:
            ACTIVE_COLLECTORS.append(collector_id)
            if time.time() - start_time >= duration: # type: ignore
                print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
            if time.time() - start_time >= duration:            # condition to terminate backend
                print("Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
                self.generate_kafka_response(collector_id, "NULL", False)
                # write to Kafka
                break
            # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
            self.extract_kpi_value(collector_id, kpi_id)
@@ -101,7 +106,8 @@ class TelemetryBackendService:
        """
        Method to extract kpi value.
        """
        measured_kpi_value = random.randint(1,100)
        measured_kpi_value = random.randint(1,100)                  # Should be extracted from exporter/stream
        # measured_kpi_value = self.fetch_node_exporter_metrics()     # exporter extracted metric value against default KPI
        self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)

    def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
@@ -111,14 +117,15 @@ class TelemetryBackendService:
        producer_configs = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }
        topic_response = "topic_response"
        # topic_response = "topic_response"
        msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
        msg_key    = collector_id
        producerObj = KafkaProducer(producer_configs)
        producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.flush()

    def create_topic_if_not_exists(self, new_topics: list):
    def create_topic_if_not_exists(self, new_topics: list) -> bool:
        """
        Method to create Kafka topic if it does not exist.
        Args:
@@ -132,12 +139,10 @@ class TelemetryBackendService:
                    print(f"Topic '{topic}' does not exist. Creating...")
                    new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
                    ADMIN_KAFKA_CLIENT.create_topics([new_topic])
                return True
            except KafkaException as e:
                print(f"Failed to create topic: {e}")
                return False
        
        self.verify_required_kafka_topics()
        return True

    def delivery_callback(self, err, msg):
        """
@@ -151,22 +156,6 @@ class TelemetryBackendService:
        else:
            print(f'Message delivered to topic {msg.topic()}')

        # Function to create a list of topics

    # Function to list all topics in the Kafka cluster
    def verify_required_kafka_topics(self) -> list:
        """List all topics in the Kafka cluster."""
        try:
            # Fetch metadata from the broker
            metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=10)
            topics = list(metadata.topics.keys())
            print("Topics in the cluster:", topics)
            return topics
        except Exception as e:
            print(f"Failed to list topics: {e}")
            return []
    

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------

    def fetch_node_exporter_metrics(self):
@@ -176,7 +165,6 @@ class TelemetryBackendService:
            str: Metrics fetched from Node Exporter.
        """
        KPI = "node_network_receive_packets_total"
        EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"
        try:
            response = requests.get(EXPORTER_ENDPOINT) # type: ignore
            if response.status_code == 200:
@@ -200,7 +188,7 @@ class TelemetryBackendService:
        """
        Method to extract the value of a metric from the metrics string.
        Args:
            metrics (str): Metrics string fetched from Node Exporter.
            metrics (str): Metrics string fetched from Exporter.
            metric_name (str): Name of the metric to extract.
        Returns:
            float: Value of the extracted metric, or None if not found.
@@ -215,39 +203,4 @@ class TelemetryBackendService:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    def produce_metrics(self):
        """
        Method to produce metrics to Kafka topic as per Kafka configs.
        """
        conf = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }

        admin_client = AdminClient(conf)
        self.create_topic_if_not_exists(admin_client)

        kafka_producer = KafkaProducer(conf)

        try:
            start_time = time.time()
            while True:
                metrics = self.fetch_node_exporter_metrics()  # select the function name based on the provided requirements

                if metrics:
                    kafka_producer.produce("topic_raw", str(metrics), callback=self.delivery_callback)
                    kafka_producer.flush()
                    # print("Metrics produced to Kafka topic")

                # Check if the specified run duration has elapsed
                if time.time() - start_time >= self.run_duration: # type: ignore
                    break

                # waiting time until next fetch 
                time.sleep(self.fetch_interval) # type: ignore
        except KeyboardInterrupt:
            print("Keyboard interrupt detected. Exiting...")
        finally:
            kafka_producer.flush()
            # kafka_producer.close()        # this command generates ERROR

# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
 No newline at end of file
+8 −0
Original line number Diff line number Diff line
@@ -29,6 +29,14 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend
###########################

def test_verify_kafka_topics():
    LOGGER.warning('test_receive_kafka_request requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
    KafkaTopics = ['topic_request', 'topic_response']
    response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
    LOGGER.debug(str(response))
    assert isinstance(response, bool)

def test_run_kafka_listener():
    LOGGER.warning('test_receive_kafka_request requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
+10 −6
Original line number Diff line number Diff line
@@ -34,6 +34,9 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL      = MetricsPool('Monitoring', 'TelemetryFrontend')
KAFKA_SERVER_IP   = '127.0.0.1:9092'
ACTIVE_COLLECTORS = []
KAFKA_TOPICS      = {'request' : 'topic_request', 
                     'response': 'topic_response'}


class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
    def __init__(self, name_mapping : NameMapping):
@@ -68,13 +71,14 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
        producer_configs = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }
        topic_request = "topic_request"
        # topic_request = "topic_request"
        msg_value   = Tuple [str, int, int]
        msg_value   = (kpi, duration, interval)
        print ("Request generated: ", "Colletcor Id: ", msg_key, \
                ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval)
        producerObj = KafkaProducer(producer_configs)
        producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        # producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        ACTIVE_COLLECTORS.append(msg_key)
        producerObj.flush()
        return producerObj
@@ -94,15 +98,15 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                'group.id'          : 'frontend',
                'auto.offset.reset' : 'latest'
            }
            topic_response = "topic_response"
            # topic_response = "topic_response"

            consumerObj = KafkaConsumer(conusmer_configs)
            consumerObj.subscribe([topic_response])
            consumerObj.subscribe([KAFKA_TOPICS['response']])
            # print (time.time())
            while True:
                receive_msg = consumerObj.poll(2.0)
                if receive_msg is None:
                    print (" - Telemetry frontend listening on Kafka Topic: ", topic_response)     # added for debugging purposes
                    print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response'])     # added for debugging purposes
                    continue
                elif receive_msg.error():
                    if receive_msg.error().code() == KafkaError._PARTITION_EOF: