Skip to content
Snippets Groups Projects
TelemetryBackendService.py 7.03 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import queue
    
    import json
    
    import time
    import logging
    import threading
    
    from typing           import Any, Dict
    from datetime         import datetime, timezone
    from confluent_kafka  import Producer as KafkaProducer
    from confluent_kafka  import Consumer as KafkaConsumer
    from confluent_kafka  import KafkaError
    
    from common.Constants import ServiceNameEnum
    
    from common.Settings  import get_service_port_grpc
    from common.method_wrappers.Decorator        import MetricsPool
    from common.tools.kafka.Variables            import KafkaConfig, KafkaTopic
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from common.tools.service.GenericGrpcService import GenericGrpcService
    
    
    LOGGER             = logging.getLogger(__name__)
    
    METRICS_POOL       = MetricsPool('TelemetryBackend', 'backendService')
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    class TelemetryBackendService(GenericGrpcService):
    
        Class listens for request on Kafka topic, fetches requested metrics from device.
    
        Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
    
        def __init__(self, cls_name : str = __name__) -> None:
    
            LOGGER.info('Init TelemetryBackendService')
    
            port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
            super().__init__(port, cls_name=cls_name)
    
            self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
            self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
    
                                                'group.id'           : 'backend',
                                                'auto.offset.reset'  : 'latest'})
    
            self.running_threads   = {}
            self.emulatorCollector = None
            self.metric_queue      = queue.Queue()
    
            threading.Thread(target=self.RequestListener).start()
    
        def RequestListener(self):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            listener for requests on Kafka topic.
    
            LOGGER.info('Telemetry backend request listener is running ...')
    
            # print      ('Telemetry backend request listener is running ...')
    
            consumer = self.kafka_consumer
    
            consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
    
                receive_msg = consumer.poll(2.0)
    
                if receive_msg is None:
                    continue
                elif receive_msg.error():
                    if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
    
                        LOGGER.error("Consumer error: {}".format(receive_msg.error()))
    
                try: 
                    collector = json.loads(receive_msg.value().decode('utf-8'))
                    collector_id = receive_msg.key().decode('utf-8')
                    LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
    
                    if collector['duration'] == -1 and collector['interval'] == -1:
                        self.TerminateCollectorBackend(collector_id)
                    else:
    
                        threading.Thread(target=self.InitiateCollectorBackend, 
                                      args=(collector_id, collector)).start()
    
                    LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
    
        def InitiateCollectorBackend(self, collector_id, collector):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
    
            Method receives collector request and initiates collecter backend.
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
    
            LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id)))
            # start_time = time.time()
            # self.emulatorCollector = NetworkMetricsEmulator(
            #     duration           = collector['duration'],
            #     interval           = collector['interval'],
            #     metric_queue       = self.metric_queue
            # )
            # self.emulatorCollector.start()
            # self.running_threads[collector_id] = self.emulatorCollector
    
            # while self.emulatorCollector.is_alive():
            #     if not self.metric_queue.empty():
            #         metric_value = self.metric_queue.get()
            #         LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value))
            #         self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value)
            #     time.sleep(1)
            # self.TerminateCollectorBackend(collector_id)
    
        def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
    
            Method to write kpi value on VALUE Kafka topic
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            """
            producer = self.kafka_producer
            kpi_value : Dict = {
    
                "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                "kpi_id"    : kpi_id,
    
                "kpi_value" : measured_kpi_value
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            }
            producer.produce(
    
                KafkaTopic.VALUE.value,
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                key      = collector_id,
                value    = json.dumps(kpi_value),
                callback = self.delivery_callback
            )
            producer.flush()
    
    
        def TerminateCollectorBackend(self, collector_id):
            LOGGER.debug("Terminating collector backend...")
            if collector_id in self.running_threads:
                thread = self.running_threads[collector_id]
                thread.stop()
                del self.running_threads[collector_id]
                LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
                self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
            else:
                LOGGER.warning('Backend collector {:} not found'.format(collector_id))
    
        def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
    
            Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic
    
            producer = self.kafka_producer
            kpi_value : Dict = {
                "kpi_id"    : kpi_id,
    
                "kpi_value" : measured_kpi_value,
    
            }
            producer.produce(
    
                KafkaTopic.TELEMETRY_RESPONSE.value,
    
                key      = collector_id,
                value    = json.dumps(kpi_value),
                callback = self.delivery_callback
            )
            producer.flush()
    
        def delivery_callback(self, err, msg):
    
            if err: 
                LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
    
                # print(f'Message delivery failed: {err}')
    
            # else:
            #     LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
                # print(f'Message delivered to topic {msg.topic()}')