# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import time import random import logging import threading from typing import Any, Dict # from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.method_wrappers.Decorator import MetricsPool from common.tools.service.GenericGrpcService import GenericGrpcService LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') # EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" class TelemetryBackendService(GenericGrpcService): """ Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both RESPONSE and VALUE kafka topics. """ def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) super().__init__(port, cls_name=cls_name) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) self.running_threads = {} def install_servicers(self): threading.Thread(target=self.RequestListener).start() def RequestListener(self): """ listener for requests on Kafka topic. """ consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(receive_msg.error())) break collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) else: self.RunInitiateCollectorBackend(collector_id, collector) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: thread, stop_event = self.running_threads[collector_id] stop_event.set() thread.join() print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. else: print ('Backend collector {:} not found'.format(collector_id)) def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() thread = threading.Thread(target=self.InitiateCollectorBackend, args=(collector_id, collector, stop_event)) self.running_threads[collector_id] = (thread, stop_event) thread.start() def InitiateCollectorBackend(self, collector_id, collector, stop_event): """ Method receives collector request and initiates collecter backend. """ print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): if time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend. break self.ExtractKpiValue(collector_id, collector['kpi_id']) time.sleep(collector['interval']) def ExtractKpiValue(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device print ("Measured Kpi value: {:}".format(measured_kpi_value)) # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi value on RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( KafkaTopic.RESPONSE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() def GenerateRawMetric(self, metrics: Any): """ Method writes raw metrics on VALUE Kafka topic """ producer = self.kafka_producer some_metric : Dict = { "some_id" : metrics } producer.produce( KafkaTopic.VALUE.value, key = 'raw', value = json.dumps(some_metric), callback = self.delivery_callback ) producer.flush() def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') # else: print(f'Message delivered to topic {msg.topic()}') # # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- # @staticmethod # def fetch_single_node_exporter_metric(): # """ # Method to fetch metrics from Node Exporter. # Returns: # str: Metrics fetched from Node Exporter. # """ # KPI = "node_network_receive_packets_total" # try: # response = requests.get(EXPORTER_ENDPOINT) # type: ignore # LOGGER.info("Request status {:}".format(response)) # if response.status_code == 200: # # print(f"Metrics fetched sucessfully...") # metrics = response.text # # Check if the desired metric is available in the response # if KPI in metrics: # KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) # # Extract the metric value # if KPI_VALUE is not None: # LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) # print(f"Extracted value of {KPI} is: {KPI_VALUE}") # return KPI_VALUE # else: # LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) # # print(f"Failed to fetch metrics. Status code: {response.status_code}") # return None # except Exception as e: # LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) # # print(f"Failed to fetch metrics: {str(e)}") # return None # @staticmethod # def extract_metric_value(metrics, metric_name): # """ # Method to extract the value of a metric from the metrics string. # Args: # metrics (str): Metrics string fetched from Exporter. # metric_name (str): Name of the metric to extract. # Returns: # float: Value of the extracted metric, or None if not found. # """ # try: # # Find the metric line containing the desired metric name # metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) # # Split the line to extract the metric value # metric_value = float(metric_line.split()[1]) # return metric_value # except StopIteration: # print(f"Metric '{metric_name}' not found in the metrics.") # return None # @staticmethod # def stream_node_export_metrics_to_raw_topic(): # try: # while True: # response = requests.get(EXPORTER_ENDPOINT) # # print("Response Status {:} ".format(response)) # # LOGGER.info("Response Status {:} ".format(response)) # try: # if response.status_code == 200: # producerObj = KafkaProducer(PRODUCER_CONFIG) # producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) # producerObj.flush() # LOGGER.info("Produce to topic") # else: # LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) # print(f"Didn't received expected response. Status code: {response.status_code}") # return None # time.sleep(15) # except Exception as e: # LOGGER.info("Failed to process response. Status code: {:}".format(e)) # return None # except Exception as e: # LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) # print(f"Failed to fetch metrics: {str(e)}") # return None # # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------