diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index d81be79dbe410ccbf2781816f34735f6bfe5639d..937409d15010b9d4afde186ec4a3235c3333d1b6 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,64 +12,52 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast +import json import time import random import logging -import requests import threading -from typing import Any, Tuple +from typing import Any, Dict from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaException from confluent_kafka import KafkaError -from confluent_kafka.admin import AdminClient, NewTopic -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from common.method_wrappers.Decorator import MetricsPool + LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -KAFKA_SERVER_IP = '127.0.0.1:9092' -# KAFKA_SERVER_IP = '10.152.183.175:30092' -ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) -KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} -EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" -PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} +METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') +# EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" class TelemetryBackendService: """ - Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic. + Class listens for request on Kafka topic, fetches requested metrics from device. + Produces metrics on both RESPONSE and VALUE kafka topics. """ def __init__(self): LOGGER.info('Init TelemetryBackendService') + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, + 'group.id' : 'backend', + 'auto.offset.reset' : 'latest'}) self.running_threads = {} - - def run_kafka_listener(self)->bool: - threading.Thread(target=self.kafka_listener).start() - return True - - def kafka_listener(self): + + def RunRequestListener(self)->bool: + threading.Thread(target=self.RequestListener).start() + return True + + def RequestListener(self): """ listener for requests on Kafka topic. """ - conusmer_configs = { - 'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'backend', - 'auto.offset.reset' : 'latest' - } - # topic_request = "topic_request" - consumerObj = KafkaConsumer(conusmer_configs) - # consumerObj.subscribe([topic_request]) - consumerObj.subscribe([KAFKA_TOPICS['request']]) - + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.REQUEST.value]) while True: - receive_msg = consumerObj.poll(2.0) + receive_msg = consumer.poll(2.0) if receive_msg is None: - # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -77,177 +65,159 @@ class TelemetryBackendService: else: print("Consumer error: {}".format(receive_msg.error())) break - (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) + + collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') - if duration == -1 and interval == -1: - self.terminate_collector_backend(collector_id) - # threading.Thread(target=self.terminate_collector_backend, args=(collector_id)) + LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + + if collector['duration'] == -1 and collector['interval'] == -1: + self.TerminateCollectorBackend(collector_id) else: - self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) + self.RunInitiateCollectorBackend(collector_id, collector) + def TerminateCollectorBackend(self, collector_id): + if collector_id in self.running_threads: + thread, stop_event = self.running_threads[collector_id] + stop_event.set() + thread.join() + print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + del self.running_threads[collector_id] + self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. + else: + print ('Backend collector {:} not found'.format(collector_id)) - def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int): + def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() - thread = threading.Thread(target=self.initiate_collector_backend, - args=(collector_id, kpi_id, duration, interval, stop_event)) + thread = threading.Thread(target=self.InitiateCollectorBackend, + args=(collector_id, collector, stop_event)) self.running_threads[collector_id] = (thread, stop_event) thread.start() - def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event - ): # type: ignore + def InitiateCollectorBackend(self, collector_id, collector, stop_event): """ - Method to receive collector request attribues and initiates collecter backend. + Method receives collector request and initiates collecter backend. """ print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): - if time.time() - start_time >= duration: # condition to terminate backend + if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.generate_kafka_response(collector_id, "-1", -1) - # write to Kafka to send the termination confirmation. + self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. break - # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) - self.extract_kpi_value(collector_id, kpi_id) - # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) - time.sleep(interval) + self.ExtractKpiValue(collector_id, collector['kpi_id']) + time.sleep(collector['interval']) - def extract_kpi_value(self, collector_id: str, kpi_id: str): + def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. """ - measured_kpi_value = random.randint(1,100) # Should be extracted from exporter/stream - # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI - self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value) + measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device + print ("Measured Kpi value: {:}".format(measured_kpi_value)) + # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI + self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) - def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any): + def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write response on Kafka topic """ - # topic_response = "topic_response" - msg_value : Tuple [str, Any] = (kpi_id, kpi_value) - msg_key = collector_id - producerObj = KafkaProducer(PRODUCER_CONFIG) - # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) - producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback) - producerObj.flush() - - def terminate_collector_backend(self, collector_id): - if collector_id in self.running_threads: - thread, stop_event = self.running_threads[collector_id] - stop_event.set() - thread.join() - print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) - del self.running_threads[collector_id] - self.generate_kafka_response(collector_id, "-1", -1) - - def create_topic_if_not_exists(self, new_topics: list) -> bool: - """ - Method to create Kafka topic if it does not exist. - Args: - admin_client (AdminClient): Kafka admin client. - """ - for topic in new_topics: - try: - topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5) - if topic not in topic_metadata.topics: - # If the topic does not exist, create a new topic - print(f"Topic '{topic}' does not exist. Creating...") - LOGGER.warning("Topic {:} does not exist. Creating...".format(topic)) - new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) - ADMIN_KAFKA_CLIENT.create_topics([new_topic]) - except KafkaException as e: - print(f"Failed to create topic: {e}") - return False - return True + producer = self.kafka_producer + kpi_value : Dict = { + "kpi_id" : kpi_id, + "kpi_value" : measured_kpi_value + } + producer.produce( + KafkaTopic.RESPONSE.value, + key = collector_id, + value = json.dumps(kpi_value), + callback = self.delivery_callback + ) + producer.flush() - @staticmethod - def delivery_callback( err, msg): + def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - print(f'Message delivery failed: {err}') - else: - print(f'Message delivered to topic {msg.topic()}') - -# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- - @staticmethod - def fetch_single_node_exporter_metric(): - """ - Method to fetch metrics from Node Exporter. - Returns: - str: Metrics fetched from Node Exporter. - """ - KPI = "node_network_receive_packets_total" - try: - response = requests.get(EXPORTER_ENDPOINT) # type: ignore - LOGGER.info("Request status {:}".format(response)) - if response.status_code == 200: - # print(f"Metrics fetched sucessfully...") - metrics = response.text - # Check if the desired metric is available in the response - if KPI in metrics: - KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) - # Extract the metric value - if KPI_VALUE is not None: - LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) - print(f"Extracted value of {KPI} is: {KPI_VALUE}") - return KPI_VALUE - else: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) - # print(f"Failed to fetch metrics. Status code: {response.status_code}") - return None - except Exception as e: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) - # print(f"Failed to fetch metrics: {str(e)}") - return None - - @staticmethod - def extract_metric_value(metrics, metric_name): - """ - Method to extract the value of a metric from the metrics string. - Args: - metrics (str): Metrics string fetched from Exporter. - metric_name (str): Name of the metric to extract. - Returns: - float: Value of the extracted metric, or None if not found. - """ - try: - # Find the metric line containing the desired metric name - metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) - # Split the line to extract the metric value - metric_value = float(metric_line.split()[1]) - return metric_value - except StopIteration: - print(f"Metric '{metric_name}' not found in the metrics.") - return None - - @staticmethod - def stream_node_export_metrics_to_raw_topic(): - try: - while True: - response = requests.get(EXPORTER_ENDPOINT) - # print("Response Status {:} ".format(response)) - # LOGGER.info("Response Status {:} ".format(response)) - try: - if response.status_code == 200: - producerObj = KafkaProducer(PRODUCER_CONFIG) - producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) - producerObj.flush() - LOGGER.info("Produce to topic") - else: - LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) - print(f"Didn't received expected response. Status code: {response.status_code}") - return None - time.sleep(15) - except Exception as e: - LOGGER.info("Failed to process response. Status code: {:}".format(e)) - return None - except Exception as e: - LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) - print(f"Failed to fetch metrics: {str(e)}") - return None -# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file + Args: err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: print(f'Message delivery failed: {err}') + # else: print(f'Message delivered to topic {msg.topic()}') + +# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- +# @staticmethod +# def fetch_single_node_exporter_metric(): +# """ +# Method to fetch metrics from Node Exporter. +# Returns: +# str: Metrics fetched from Node Exporter. +# """ +# KPI = "node_network_receive_packets_total" +# try: +# response = requests.get(EXPORTER_ENDPOINT) # type: ignore +# LOGGER.info("Request status {:}".format(response)) +# if response.status_code == 200: +# # print(f"Metrics fetched sucessfully...") +# metrics = response.text +# # Check if the desired metric is available in the response +# if KPI in metrics: +# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) +# # Extract the metric value +# if KPI_VALUE is not None: +# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) +# print(f"Extracted value of {KPI} is: {KPI_VALUE}") +# return KPI_VALUE +# else: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) +# # print(f"Failed to fetch metrics. Status code: {response.status_code}") +# return None +# except Exception as e: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) +# # print(f"Failed to fetch metrics: {str(e)}") +# return None + +# @staticmethod +# def extract_metric_value(metrics, metric_name): +# """ +# Method to extract the value of a metric from the metrics string. +# Args: +# metrics (str): Metrics string fetched from Exporter. +# metric_name (str): Name of the metric to extract. +# Returns: +# float: Value of the extracted metric, or None if not found. +# """ +# try: +# # Find the metric line containing the desired metric name +# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) +# # Split the line to extract the metric value +# metric_value = float(metric_line.split()[1]) +# return metric_value +# except StopIteration: +# print(f"Metric '{metric_name}' not found in the metrics.") +# return None + +# @staticmethod +# def stream_node_export_metrics_to_raw_topic(): +# try: +# while True: +# response = requests.get(EXPORTER_ENDPOINT) +# # print("Response Status {:} ".format(response)) +# # LOGGER.info("Response Status {:} ".format(response)) +# try: +# if response.status_code == 200: +# producerObj = KafkaProducer(PRODUCER_CONFIG) +# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) +# producerObj.flush() +# LOGGER.info("Produce to topic") +# else: +# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) +# print(f"Didn't received expected response. Status code: {response.status_code}") +# return None +# time.sleep(15) +# except Exception as e: +# LOGGER.info("Failed to process response. Status code: {:}".format(e)) +# return None +# except Exception as e: +# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) +# print(f"Failed to fetch metrics: {str(e)}") +# return None +# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index d832e54e77589ca677682760d19e68b1bd09b1f7..3d7ec82ac30d042f278df03db0b3f24e9b0604da 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -12,15 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -print (sys.path) -sys.path.append('/home/tfs/tfs-ctrl') -import threading import logging -from typing import Tuple -# from common.proto.context_pb2 import Empty from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService + LOGGER = logging.getLogger(__name__) @@ -28,26 +23,9 @@ LOGGER = logging.getLogger(__name__) # Tests Implementation of Telemetry Backend ########################### -def test_verify_kafka_topics(): - LOGGER.info('test_verify_kafka_topics requesting') +def test_RunRequestListener(): + LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] - response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) + response = TelemetryBackendServiceObj.RunRequestListener() LOGGER.debug(str(response)) assert isinstance(response, bool) - -# def test_run_kafka_listener(): -# LOGGER.info('test_receive_kafka_request requesting') -# TelemetryBackendServiceObj = TelemetryBackendService() -# response = TelemetryBackendServiceObj.run_kafka_listener() -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# def test_fetch_node_exporter_metrics(): -# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') -# TelemetryBackendService.fetch_single_node_exporter_metric() - -def test_stream_node_export_metrics_to_raw_topic(): - LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') - threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start() - diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 29c192bdf819df0c3865c4917598b36e76095fef..e6a6d0cd54c9fa664539c336da5f76d176b4c637 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -12,25 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ast +import json import threading -from typing import Tuple, Any +from typing import Any, Dict import grpc import logging from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError - from common.proto.context_pb2 import Empty - from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer + from telemetry.database.TelemetryModel import Collector as CollectorModel from telemetry.database.Telemetry_DB import TelemetryDB +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError + LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC') @@ -46,8 +46,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): 'group.id' : 'frontend', 'auto.offset.reset' : 'latest'}) - # --->>> SECTION: StartCollector with all helper methods <<<--- - + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore @@ -55,101 +54,35 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() - # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists. + # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists? self.tele_db_obj.add_row_to_db( CollectorModel.ConvertCollectorToRow(request) ) - self.PublishRequestOnKafka(request) + self.PublishStartRequestOnKafka(request) - response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore + response.collector_id.uuid = request.collector_id.collector_id.uuid return response - def PublishRequestOnKafka(self, collector_obj): + def PublishStartRequestOnKafka(self, collector_obj): """ Method to generate collector request on Kafka. """ collector_uuid = collector_obj.collector_id.collector_id.uuid - collector_to_generate : Tuple [str, int, int] = ( - collector_obj.kpi_id.kpi_id.uuid, - collector_obj.duration_s, - collector_obj.interval_s - ) + collector_to_generate : Dict = { + "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, + "duration": collector_obj.duration_s, + "interval": collector_obj.interval_s + } self.kafka_producer.produce( KafkaTopic.REQUEST.value, key = collector_uuid, - value = str(collector_to_generate), + value = json.dumps(collector_to_generate), callback = self.delivery_callback ) LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate)) ACTIVE_COLLECTORS.append(collector_uuid) self.kafka_producer.flush() - - def run_kafka_listener(self): - # print ("--- STARTED: run_kafka_listener ---") - threading.Thread(target=self.kafka_listener).start() - return True - - def kafka_listener(self): - """ - listener for response on Kafka topic. - """ - # # print ("--- STARTED: kafka_listener ---") - # conusmer_configs = { - # 'bootstrap.servers' : KAFKA_SERVER_IP, - # 'group.id' : 'frontend', - # 'auto.offset.reset' : 'latest' - # } - # # topic_response = "topic_response" - - # consumerObj = KafkaConsumer(conusmer_configs) - self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) - # print (time.time()) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print("Consumer error: {}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_id, kpi_value) - else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - except Exception as e: - print(f"No message key found: {str(e)}") - continue - # return None - def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): - if kpi_id == "-1" and kpi_value == -1: - # LOGGER.info("Sucessfully terminated Collector: {:}".format(collector_id)) - print ("Sucessfully terminated Collector: ", collector_id) - else: - print ("Frontend-Received values Collector Id:", collector_id, "-KPI:", kpi_id, "-VALUE:", kpi_value) - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - LOGGER.debug('Message delivery failed: {:}'.format(err)) - print('Message delivery failed: {:}'.format(err)) - else: - LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - print('Message delivered to topic {:}'.format(msg.topic())) - - # <<<--- SECTION: StopCollector with all helper methods --->>> @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, @@ -164,13 +97,15 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): Method to generate stop collector request on Kafka. """ collector_uuid = collector_id.collector_id.uuid - collector_to_stop : Tuple [str, int, int] = ( - collector_uuid , -1, -1 - ) + collector_to_stop : Dict = { + "kpi_id" : collector_uuid, + "duration": -1, + "interval": -1 + } self.kafka_producer.produce( KafkaTopic.REQUEST.value, key = collector_uuid, - value = str(collector_to_stop), + value = json.dumps(collector_to_stop), callback = self.delivery_callback ) LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop)) @@ -180,6 +115,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid)) self.kafka_producer.flush() + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore @@ -199,3 +135,57 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): except Exception as e: LOGGER.info('Unable to process filter response {:}'.format(e)) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print('Message delivery failed: {:}'.format(err)) + # else: + # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) + + # ---------- Independent Method --------------- + # Listener method is independent of any method (same lifetime as service) + # continously listens for responses + def RunResponseListener(self): + threading.Thread(target=self.ResponseListener).start() + return True + + def ResponseListener(self): + """ + listener for response on Kafka topic. + """ + self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value]) + while True: + receive_msg = self.kafka_consumer.poll(2.0) + if receive_msg is None: + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + try: + collector_id = receive_msg.key().decode('utf-8') + if collector_id in ACTIVE_COLLECTORS: + kpi_value = json.loads(receive_msg.value().decode('utf-8')) + self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) + else: + print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + except Exception as e: + print(f"Error extarcting msg key or value: {str(e)}") + continue + + def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): + if kpi_id == "-1" and kpi_value == -1: + print ("Backend termination confirmation for collector id: ", collector_id) + else: + print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) diff --git a/src/telemetry/frontend/tests/__init__.py b/src/telemetry/frontend/tests/__init__.py deleted file mode 100644 index 3ee6f7071f145e06c3aeaefc09a43ccd88e619e3..0000000000000000000000000000000000000000 --- a/src/telemetry/frontend/tests/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index d967e306ac9be501b80f7ec7b83cd14329d0f793..3f8f3ebc805e721908a333228170c0007ce2bce9 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -28,6 +28,7 @@ from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendC from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_id, create_collector_filter) +from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl ########################### @@ -98,6 +99,13 @@ def test_SelectCollectors(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorList) +def test_RunResponseListener(): + LOGGER.info(' >>> test_RunResponseListener START: <<< ') + TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() + response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto + LOGGER.debug(str(response)) + assert isinstance(response, bool) + # ------- previous test ---------------- # def test_verify_db_and_table():