Skip to content
Snippets Groups Projects
TelemetryBackendService.py 10.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import json
    
    import logging
    import threading
    
    from typing import Any, Dict
    
    # from common.proto.context_pb2 import Empty
    
    from confluent_kafka import Producer as KafkaProducer
    from confluent_kafka import Consumer as KafkaConsumer
    from confluent_kafka import KafkaError
    
    from common.Constants import ServiceNameEnum
    from common.Settings import get_service_port_grpc
    
    from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
    from common.method_wrappers.Decorator import MetricsPool
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from common.tools.service.GenericGrpcService import GenericGrpcService
    
    
    LOGGER             = logging.getLogger(__name__)
    
    METRICS_POOL       = MetricsPool('TelemetryBackend', 'backendService')
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    class TelemetryBackendService(GenericGrpcService):
    
        Class listens for request on Kafka topic, fetches requested metrics from device.
        Produces metrics on both RESPONSE and VALUE kafka topics.
    
        def __init__(self, cls_name : str = __name__) -> None:
    
            LOGGER.info('Init TelemetryBackendService')
    
            port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
            super().__init__(port, cls_name=cls_name)
    
            self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
            self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
    
                                                'group.id'           : 'backend',
                                                'auto.offset.reset'  : 'latest'})
    
            threading.Thread(target=self.RequestListener).start()
    
        def RequestListener(self):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            listener for requests on Kafka topic.
    
            consumer = self.kafka_consumer
            consumer.subscribe([KafkaTopic.REQUEST.value])
    
                receive_msg = consumer.poll(2.0)
    
                if receive_msg is None:
                    continue
                elif receive_msg.error():
                    if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        print("Consumer error: {}".format(receive_msg.error()))
                        break
    
                
                collector = json.loads(receive_msg.value().decode('utf-8'))
    
                collector_id = receive_msg.key().decode('utf-8')
    
                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
                print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
    
                if collector['duration'] == -1 and collector['interval'] == -1:
                    self.TerminateCollectorBackend(collector_id)
    
                    self.RunInitiateCollectorBackend(collector_id, collector)
    
        def TerminateCollectorBackend(self, collector_id):
            if collector_id in self.running_threads:
                thread, stop_event = self.running_threads[collector_id]
                stop_event.set()
                thread.join()
                print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
                del self.running_threads[collector_id]
                self.GenerateCollectorResponse(collector_id, "-1", -1)          # Termination confirmation to frontend.
            else:
                print ('Backend collector {:} not found'.format(collector_id))
    
        def RunInitiateCollectorBackend(self, collector_id: str, collector: str):
    
            stop_event = threading.Event()
    
            thread = threading.Thread(target=self.InitiateCollectorBackend, 
                                      args=(collector_id, collector, stop_event))
    
            self.running_threads[collector_id] = (thread, stop_event)
            thread.start()
    
        def InitiateCollectorBackend(self, collector_id, collector, stop_event):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
    
            Method receives collector request and initiates collecter backend.
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
    
            print("Initiating backend for collector: ", collector_id)
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            start_time = time.time()
    
            while not stop_event.is_set():
    
                if time.time() - start_time >= collector['duration']:            # condition to terminate backend
    
                    print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
    
                    self.GenerateCollectorResponse(collector_id, "-1", -1)       # Termination confirmation to frontend.
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                    break
    
                self.ExtractKpiValue(collector_id, collector['kpi_id'])
                time.sleep(collector['interval'])
    
        def ExtractKpiValue(self, collector_id: str, kpi_id: str):
    
            measured_kpi_value = random.randint(1,100)                      # TODO: To be extracted from a device
            print ("Measured Kpi value: {:}".format(measured_kpi_value))
            # measured_kpi_value = self.fetch_node_exporter_metrics()       # exporter extracted metric value against default KPI
            self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value)
    
        def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            Method to write kpi value on RESPONSE Kafka topic
    
            producer = self.kafka_producer
            kpi_value : Dict = {
                "kpi_id"    : kpi_id,
                "kpi_value" : measured_kpi_value
            }
            producer.produce(
                KafkaTopic.RESPONSE.value,
                key      = collector_id,
                value    = json.dumps(kpi_value),
                callback = self.delivery_callback
            )
            producer.flush()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
        def GenerateRawMetric(self, metrics: Any):
            """
            Method writes raw metrics on VALUE Kafka topic
            """
            producer = self.kafka_producer
            some_metric : Dict = {
                "some_id"    : metrics
            }
            producer.produce(
                KafkaTopic.VALUE.value,
                key      = 'raw',
                value    = json.dumps(some_metric),
                callback = self.delivery_callback
            )
            producer.flush()
    
    
        def delivery_callback(self, err, msg):
    
            """
            Callback function to handle message delivery status.
    
            Args: err (KafkaError): Kafka error object.
                  msg (Message): Kafka message object.
            """
            if err: print(f'Message delivery failed: {err}')
            # else:   print(f'Message delivered to topic {msg.topic()}')
    
    # # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
    #     @staticmethod
    #     def fetch_single_node_exporter_metric():
    #         """
    #         Method to fetch metrics from Node Exporter.
    #         Returns:
    #             str: Metrics fetched from Node Exporter.
    #         """
    #         KPI = "node_network_receive_packets_total"
    #         try:
    #             response = requests.get(EXPORTER_ENDPOINT) # type: ignore
    #             LOGGER.info("Request status {:}".format(response))
    #             if response.status_code == 200:
    #                 # print(f"Metrics fetched sucessfully...")
    #                 metrics = response.text
    #                 # Check if the desired metric is available in the response
    #                 if KPI in metrics:
    #                     KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
    #                     # Extract the metric value
    #                     if KPI_VALUE is not None:
    #                         LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
    #                         print(f"Extracted value of {KPI} is: {KPI_VALUE}")
    #                         return KPI_VALUE
    #             else:
    #                 LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
    #                 # print(f"Failed to fetch metrics. Status code: {response.status_code}")
    #                 return None
    #         except Exception as e:
    #             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
    #             # print(f"Failed to fetch metrics: {str(e)}")
    #             return None
    
    #     @staticmethod
    #     def extract_metric_value(metrics, metric_name):
    #         """
    #         Method to extract the value of a metric from the metrics string.
    #         Args:
    #             metrics (str): Metrics string fetched from Exporter.
    #             metric_name (str): Name of the metric to extract.
    #         Returns:
    #             float: Value of the extracted metric, or None if not found.
    #         """
    #         try:
    #             # Find the metric line containing the desired metric name
    #             metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
    #             # Split the line to extract the metric value
    #             metric_value = float(metric_line.split()[1])
    #             return metric_value
    #         except StopIteration:
    #             print(f"Metric '{metric_name}' not found in the metrics.")
    #             return None
    
    #     @staticmethod
    #     def stream_node_export_metrics_to_raw_topic():
    #         try:
    #             while True:
    #                 response = requests.get(EXPORTER_ENDPOINT)
    #                 # print("Response Status {:} ".format(response))
    #                 # LOGGER.info("Response Status {:} ".format(response))
    #                 try: 
    #                     if response.status_code == 200:
    #                         producerObj = KafkaProducer(PRODUCER_CONFIG)
    #                         producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
    #                         producerObj.flush()
    #                         LOGGER.info("Produce to topic")
    #                     else:
    #                         LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
    #                         print(f"Didn't received expected response. Status code: {response.status_code}")
    #                         return None
    #                     time.sleep(15)
    #                 except Exception as e:
    #                     LOGGER.info("Failed to process response. Status code: {:}".format(e))
    #                     return None
    #         except Exception as e:
    #             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
    #             print(f"Failed to fetch metrics: {str(e)}")
    #             return None
    # # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------