Commit c213a7bc authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Added the implementation of "Kafka_listener", "Initiate_collector_backend" and...

Added the implementation of "Kafka_listener", "Initiate_collector_backend" and "generate_kafka_reply"
parent ce225041
Loading
Loading
Loading
Loading
+85 −53
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@

import ast
import time
import random
import logging
import requests
import threading
@@ -46,18 +47,18 @@ class TelemetryBackendService:
        """
        conusmer_configs = {
            'bootstrap.servers' : KAFKA_SERVER_IP,
            'group.id'          : 'consumer',
            'auto.offset.reset' : 'earliest'
            'group.id'          : 'backend',
            'auto.offset.reset' : 'latest'
        }
        topic_request = "topic_request"

        if (self.create_topic_if_not_exists(topic_request)):
            consumerObj = KafkaConsumer(conusmer_configs)
            consumerObj.subscribe([topic_request])

            while True:
                receive_msg = consumerObj.poll(2.0)
                if receive_msg is None:
                print ("Telemetry backend listener is active:  Kafka Topic: ", topic_request)     # added for debugging purposes
                    print ("Telemetry backend is listening on Kafka Topic: ", topic_request)     # added for debugging purposes
                    continue
                elif receive_msg.error():
                    if receive_msg.error().code() == KafkaError._PARTITION_EOF:
@@ -66,13 +67,13 @@ class TelemetryBackendService:
                        print("Consumer error: {}".format(receive_msg.error()))
                        break
                (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
            self.execute_process_kafka_request(kpi_id, duration, interval)
                self.execute_initiate_collector_backend(kpi_id, duration, interval)

    def run_kafka_listener(self)->Empty: # type: ignore
    def run_kafka_listener(self)->bool: # type: ignore
        threading.Thread(target=self.kafka_listener).start()
        return True        

    def process_kafka_request(self, kpi_id, duration, interval
    def initiate_collector_backend(self, kpi_id, duration, interval
                        ): # type: ignore
        """
        Method to receive collector request attribues and initiates collecter backend.
@@ -80,14 +81,73 @@ class TelemetryBackendService:
        start_time = time.time()
        while True:
            if time.time() - start_time >= duration: # type: ignore
                print("Timeout: consumer terminated", time.time() - start_time)
                print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
                break
            # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
            print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
            self.extract_kpi_value(kpi_id)
            # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
            time.sleep(interval)
    
    def execute_process_kafka_request(self, kpi_id: str, duration: int, interval: int):
        threading.Thread(target=self.process_kafka_request, args=(kpi_id, duration, interval)).start()
    def execute_initiate_collector_backend(self, kpi_id: str, duration: int, interval: int):
        threading.Thread(target=self.initiate_collector_backend, args=(kpi_id, duration, interval)).start()
    


    def extract_kpi_value(self, kpi_id: str):
        """
        Method to extract kpi value.
        """
        measured_kpi_value = random.randint(1,100)
        self.generate_kafka_reply(kpi_id , measured_kpi_value)

    def generate_kafka_reply(self, kpi_id: str, kpi_value: any):
        """
        Method to write response on Kafka topic
        """
        producer_configs = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }
        topic_response = "topic_response"
        if (self.create_topic_if_not_exists(topic_response)):
            msg_value  = Tuple [str, any]
            msg_value  = (kpi_id, kpi_value)
            msg_key    = "111"                                        # to be fetch from db???

            producerObj = KafkaProducer(producer_configs)
            producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
            producerObj.flush()

    def create_topic_if_not_exists(self, new_topic_name: str):
        """
        Method to create Kafka topic if it does not exist.
        Args:
            admin_client (AdminClient): Kafka admin client.
        """
        admin_kafka_client = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
        try:
            topic_metadata = admin_kafka_client.list_topics(timeout=5)
            if new_topic_name not in topic_metadata.topics:
                # If the topic does not exist, create a new topic
                print(f"Topic '{new_topic_name}' does not exist. Creating...")
                new_topic = NewTopic(new_topic_name, num_partitions=1, replication_factor=1)
                admin_kafka_client.create_topics([new_topic])
            return True
        except KafkaException as e:
            print(f"Failed to create topic: {e}")
            return False

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')



# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
@@ -137,34 +197,6 @@ class TelemetryBackendService:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')

    def create_topic_if_not_exists(self, admin_client):
        """
        Method to create Kafka topic if it does not exist.
        Args:
            admin_client (AdminClient): Kafka admin client.
        """
        try:
            topic_metadata = admin_client.list_topics(timeout=5)
            if self.kafka_topic not in topic_metadata.topics:
                # If the topic does not exist, create a new topic
                print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
                new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
                admin_client.create_topics([new_topic])
        except KafkaException as e:
            print(f"Failed to create topic: {e}")

    def produce_metrics(self):
        """
        Method to produce metrics to Kafka topic as per Kafka configs.