diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index d5ba6ced4693a73ff716ad8ed050e2f910483b8f..6cc3aab5f726dce5a55570c04d6e3ded8bc28d76 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -19,7 +19,7 @@ import random import logging import requests import threading -from typing import Tuple +from typing import Any, Tuple from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -32,6 +32,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') KAFKA_SERVER_IP = '127.0.0.1:9092' +ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) +ACTIVE_COLLECTORS = [] class TelemetryBackendService: """ @@ -41,6 +43,10 @@ class TelemetryBackendService: def __init__(self): LOGGER.info('Init TelemetryBackendService') + def run_kafka_listener(self)->bool: # type: ignore + threading.Thread(target=self.kafka_listener).start() + return True + def kafka_listener(self): """ listener for requests on Kafka topic. @@ -51,14 +57,14 @@ class TelemetryBackendService: 'auto.offset.reset' : 'latest' } topic_request = "topic_request" - if (self.create_topic_if_not_exists(topic_request)): + if (self.create_topic_if_not_exists([topic_request])): consumerObj = KafkaConsumer(conusmer_configs) consumerObj.subscribe([topic_request]) while True: receive_msg = consumerObj.poll(2.0) if receive_msg is None: - print ("Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes + print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -67,40 +73,38 @@ class TelemetryBackendService: print("Consumer error: {}".format(receive_msg.error())) break (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.execute_initiate_collector_backend(kpi_id, duration, interval) + collector_id = receive_msg.key().decode('utf-8') + self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) - def run_kafka_listener(self)->bool: # type: ignore - threading.Thread(target=self.kafka_listener).start() - return True - def initiate_collector_backend(self, kpi_id, duration, interval + + def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int): + threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start() + + def initiate_collector_backend(self, collector_id, kpi_id, duration, interval ): # type: ignore """ Method to receive collector request attribues and initiates collecter backend. """ start_time = time.time() while True: + ACTIVE_COLLECTORS.append(collector_id) if time.time() - start_time >= duration: # type: ignore print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time) break # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) - self.extract_kpi_value(kpi_id) + self.extract_kpi_value(collector_id, kpi_id) # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) time.sleep(interval) - - def execute_initiate_collector_backend(self, kpi_id: str, duration: int, interval: int): - threading.Thread(target=self.initiate_collector_backend, args=(kpi_id, duration, interval)).start() - - - def extract_kpi_value(self, kpi_id: str): + def extract_kpi_value(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) - self.generate_kafka_reply(kpi_id , measured_kpi_value) + self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value) - def generate_kafka_reply(self, kpi_id: str, kpi_value: any): + def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any): """ Method to write response on Kafka topic """ @@ -108,33 +112,32 @@ class TelemetryBackendService: 'bootstrap.servers': KAFKA_SERVER_IP, } topic_response = "topic_response" - if (self.create_topic_if_not_exists(topic_response)): - msg_value = Tuple [str, any] - msg_value = (kpi_id, kpi_value) - msg_key = "111" # to be fetch from db??? + msg_value : Tuple [str, Any] = (kpi_id, kpi_value) + msg_key = collector_id + producerObj = KafkaProducer(producer_configs) + producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) + producerObj.flush() - producerObj = KafkaProducer(producer_configs) - producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) - producerObj.flush() - - def create_topic_if_not_exists(self, new_topic_name: str): + def create_topic_if_not_exists(self, new_topics: list): """ Method to create Kafka topic if it does not exist. Args: admin_client (AdminClient): Kafka admin client. """ - admin_kafka_client = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) - try: - topic_metadata = admin_kafka_client.list_topics(timeout=5) - if new_topic_name not in topic_metadata.topics: - # If the topic does not exist, create a new topic - print(f"Topic '{new_topic_name}' does not exist. Creating...") - new_topic = NewTopic(new_topic_name, num_partitions=1, replication_factor=1) - admin_kafka_client.create_topics([new_topic]) - return True - except KafkaException as e: - print(f"Failed to create topic: {e}") - return False + for topic in new_topics: + try: + topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5) + if topic not in topic_metadata.topics: + # If the topic does not exist, create a new topic + print(f"Topic '{topic}' does not exist. Creating...") + new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) + ADMIN_KAFKA_CLIENT.create_topics([new_topic]) + return True + except KafkaException as e: + print(f"Failed to create topic: {e}") + return False + + self.verify_required_kafka_topics() def delivery_callback(self, err, msg): """ @@ -148,7 +151,21 @@ class TelemetryBackendService: else: print(f'Message delivered to topic {msg.topic()}') + # Function to create a list of topics + # Function to list all topics in the Kafka cluster + def verify_required_kafka_topics(self) -> list: + """List all topics in the Kafka cluster.""" + try: + # Fetch metadata from the broker + metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=10) + topics = list(metadata.topics.keys()) + print("Topics in the cluster:", topics) + return topics + except Exception as e: + print(f"Failed to list topics: {e}") + return [] + # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- @@ -159,8 +176,9 @@ class TelemetryBackendService: str: Metrics fetched from Node Exporter. """ KPI = "node_network_receive_packets_total" + EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics" try: - response = requests.get(self.exporter_endpoint) # type: ignore + response = requests.get(EXPORTER_ENDPOINT) # type: ignore if response.status_code == 200: # print(f"Metrics fetched sucessfully...") metrics = response.text @@ -202,7 +220,7 @@ class TelemetryBackendService: Method to produce metrics to Kafka topic as per Kafka configs. """ conf = { - 'bootstrap.servers': self.bootstrap_servers, + 'bootstrap.servers': KAFKA_SERVER_IP, } admin_client = AdminClient(conf) @@ -216,7 +234,7 @@ class TelemetryBackendService: metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements if metrics: - kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) + kafka_producer.produce("topic_raw", str(metrics), callback=self.delivery_callback) kafka_producer.flush() # print("Metrics produced to Kafka topic") diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 045c56d5bd87dd0b3e7c9bf4b42c57507cfa52ef..ebd0db4ac1d2574720057d357bac1e5bf565510a 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -15,7 +15,7 @@ import ast import threading import time -from typing import Tuple +from typing import Tuple, Any import grpc import logging @@ -30,9 +30,10 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') -KAFKA_SERVER_IP = '127.0.0.1:9092' +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') +KAFKA_SERVER_IP = '127.0.0.1:9092' +ACTIVE_COLLECTORS = [] class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): @@ -50,7 +51,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): _collector_interval = int(request.interval_s) self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) - response.collector_id.uuid = request.collector_id.collector_id.uuid + response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore return response def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int): @@ -74,6 +75,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) + ACTIVE_COLLECTORS.append(msg_key) producerObj.flush() return producerObj @@ -108,11 +110,19 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): else: print("Consumer error: {}".format(receive_msg.error())) break - (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(kpi_id, kpi_value) - # threading.Thread(target=self.process_response, args=(kpi_id, kpi_value)).start() + try: + collector_id = receive_msg.key().decode('utf-8') + if collector_id in ACTIVE_COLLECTORS: + (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.process_response(kpi_id, kpi_value) + else: + print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + except Exception as e: + print(f"No message key found: {str(e)}") + continue + # return None - def process_response(self, kpi_id: str, kpi_value: any): + def process_response(self, kpi_id: str, kpi_value: Any): print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 93a6066ee92d82cae3c8877d3238fe5dbbc74f92..2dea48c8826284856f8d50b30589d8e8e018e633 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -22,7 +22,7 @@ def create_collector_id(): _collector_id.collector_id.uuid = uuid.uuid4() return _collector_id -def create_collector_id(coll_id_str : str): +def create_collector_id_a(coll_id_str : str): _collector_id = telemetry_frontend_pb2.CollectorId() _collector_id.collector_id.uuid = str(coll_id_str) return _collector_id @@ -32,7 +32,7 @@ def create_collector_request(): _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.interval_s = float(random.randint(2, 3)) + _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request def create_collector_request_a(): diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 4f59630d40dca3860162b3e35416c46c56e44d92..a531ed61727fcbc607927711add0dcdc89be379b 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import time import pytest import logging from typing import Union @@ -50,7 +51,7 @@ from device.service.drivers import DRIVERS LOCAL_HOST = '127.0.0.1' MOCKSERVICE_PORT = 10000 -TELEMETRY_FRONTEND_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) +TELEMETRY_FRONTEND_PORT = str(MOCKSERVICE_PORT) + str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) @@ -172,6 +173,18 @@ def test_start_collector(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorId) +def test_start_collector_a(telemetryFrontend_client): + LOGGER.warning('test_start_collector requesting') + response = telemetryFrontend_client.StartCollector(create_collector_request()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorId) + +def test_start_collector_b(telemetryFrontend_client): + LOGGER.warning('test_start_collector requesting') + response = telemetryFrontend_client.StartCollector(create_collector_request()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorId) + def test_run_kafka_listener(): LOGGER.warning('test_receive_kafka_request requesting') name_mapping = NameMapping() @@ -180,11 +193,11 @@ def test_run_kafka_listener(): LOGGER.debug(str(response)) assert isinstance(response, bool) -def test_stop_collector(telemetryFrontend_client): - LOGGER.warning('test_stop_collector requesting') - response = telemetryFrontend_client.StopCollector(create_collector_id("1")) - LOGGER.debug(str(response)) - assert isinstance(response, Empty) +# def test_stop_collector(telemetryFrontend_client): +# LOGGER.warning('test_stop_collector requesting') +# response = telemetryFrontend_client.StopCollector(create_collector_id("1")) +# LOGGER.debug(str(response)) +# assert isinstance(response, Empty) # def test_select_collectors(telemetryFrontend_client): # LOGGER.warning('test_select_collector requesting')