diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2e8478db1e7771009c9ab7833f1f886f7abda580..d5ba6ced4693a73ff716ad8ed050e2f910483b8f 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -15,6 +15,7 @@ import ast import time +import random import logging import requests import threading @@ -46,33 +47,33 @@ class TelemetryBackendService: """ conusmer_configs = { 'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'consumer', - 'auto.offset.reset' : 'earliest' + 'group.id' : 'backend', + 'auto.offset.reset' : 'latest' } topic_request = "topic_request" + if (self.create_topic_if_not_exists(topic_request)): + consumerObj = KafkaConsumer(conusmer_configs) + consumerObj.subscribe([topic_request]) - consumerObj = KafkaConsumer(conusmer_configs) - consumerObj.subscribe([topic_request]) - - while True: - receive_msg = consumerObj.poll(2.0) - if receive_msg is None: - print ("Telemetry backend listener is active: Kafka Topic: ", topic_request) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: + while True: + receive_msg = consumerObj.poll(2.0) + if receive_msg is None: + print ("Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes continue - else: - print("Consumer error: {}".format(receive_msg.error())) - break - (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.execute_process_kafka_request(kpi_id, duration, interval) - - def run_kafka_listener(self)->Empty: # type: ignore + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.execute_initiate_collector_backend(kpi_id, duration, interval) + + def run_kafka_listener(self)->bool: # type: ignore threading.Thread(target=self.kafka_listener).start() return True - def process_kafka_request(self, kpi_id, duration, interval + def initiate_collector_backend(self, kpi_id, duration, interval ): # type: ignore """ Method to receive collector request attribues and initiates collecter backend. @@ -80,15 +81,74 @@ class TelemetryBackendService: start_time = time.time() while True: if time.time() - start_time >= duration: # type: ignore - print("Timeout: consumer terminated", time.time() - start_time) + print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time) break # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) - print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) + self.extract_kpi_value(kpi_id) + # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) time.sleep(interval) - def execute_process_kafka_request(self, kpi_id: str, duration: int, interval: int): - threading.Thread(target=self.process_kafka_request, args=(kpi_id, duration, interval)).start() - + def execute_initiate_collector_backend(self, kpi_id: str, duration: int, interval: int): + threading.Thread(target=self.initiate_collector_backend, args=(kpi_id, duration, interval)).start() + + + + def extract_kpi_value(self, kpi_id: str): + """ + Method to extract kpi value. + """ + measured_kpi_value = random.randint(1,100) + self.generate_kafka_reply(kpi_id , measured_kpi_value) + + def generate_kafka_reply(self, kpi_id: str, kpi_value: any): + """ + Method to write response on Kafka topic + """ + producer_configs = { + 'bootstrap.servers': KAFKA_SERVER_IP, + } + topic_response = "topic_response" + if (self.create_topic_if_not_exists(topic_response)): + msg_value = Tuple [str, any] + msg_value = (kpi_id, kpi_value) + msg_key = "111" # to be fetch from db??? + + producerObj = KafkaProducer(producer_configs) + producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) + producerObj.flush() + + def create_topic_if_not_exists(self, new_topic_name: str): + """ + Method to create Kafka topic if it does not exist. + Args: + admin_client (AdminClient): Kafka admin client. + """ + admin_kafka_client = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) + try: + topic_metadata = admin_kafka_client.list_topics(timeout=5) + if new_topic_name not in topic_metadata.topics: + # If the topic does not exist, create a new topic + print(f"Topic '{new_topic_name}' does not exist. Creating...") + new_topic = NewTopic(new_topic_name, num_partitions=1, replication_factor=1) + admin_kafka_client.create_topics([new_topic]) + return True + except KafkaException as e: + print(f"Failed to create topic: {e}") + return False + + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + print(f'Message delivery failed: {err}') + else: + print(f'Message delivered to topic {msg.topic()}') + + # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- @@ -137,34 +197,6 @@ class TelemetryBackendService: print(f"Metric '{metric_name}' not found in the metrics.") return None - def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - print(f'Message delivery failed: {err}') - else: - print(f'Message delivered to topic {msg.topic()}') - - def create_topic_if_not_exists(self, admin_client): - """ - Method to create Kafka topic if it does not exist. - Args: - admin_client (AdminClient): Kafka admin client. - """ - try: - topic_metadata = admin_client.list_topics(timeout=5) - if self.kafka_topic not in topic_metadata.topics: - # If the topic does not exist, create a new topic - print(f"Topic '{self.kafka_topic}' does not exist. Creating...") - new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) - admin_client.create_topics([new_topic]) - except KafkaException as e: - print(f"Failed to create topic: {e}") - def produce_metrics(self): """ Method to produce metrics to Kafka topic as per Kafka configs.