Skip to content
Snippets Groups Projects
TelemetryBackendService.py 11 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import time
import logging
import threading
from typing           import Any, Dict
from datetime         import datetime, timezone
from confluent_kafka  import Producer as KafkaProducer
from confluent_kafka  import Consumer as KafkaConsumer
from confluent_kafka  import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings  import get_service_port_grpc
from common.method_wrappers.Decorator        import MetricsPool
from common.tools.kafka.Variables            import KafkaConfig, KafkaTopic
Waleed Akbar's avatar
Waleed Akbar committed
from common.tools.service.GenericGrpcService import GenericGrpcService
Waleed Akbar's avatar
Waleed Akbar committed
from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
Waleed Akbar's avatar
Waleed Akbar committed

LOGGER             = logging.getLogger(__name__)
METRICS_POOL       = MetricsPool('TelemetryBackend', 'backendService')
Waleed Akbar's avatar
Waleed Akbar committed
class TelemetryBackendService(GenericGrpcService):
    Class listens for request on Kafka topic, fetches requested metrics from device.
    Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
    def __init__(self, cls_name : str = __name__) -> None:
        LOGGER.info('Init TelemetryBackendService')
        port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
        super().__init__(port, cls_name=cls_name)
        self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'backend',
                                            'auto.offset.reset'  : 'latest'})
Waleed Akbar's avatar
Waleed Akbar committed
        self.active_jobs = {}
        threading.Thread(target=self.RequestListener).start()

    def RequestListener(self):
        listener for requests on Kafka topic.
        LOGGER.info('Telemetry backend request listener is running ...')
        # print      ('Telemetry backend request listener is running ...')
        consumer = self.kafka_consumer
        consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
Waleed Akbar's avatar
Waleed Akbar committed
            receive_msg = consumer.poll(1.0)
            if receive_msg is None:
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
Waleed Akbar's avatar
Waleed Akbar committed
                elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                    LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.")
                    continue
                    LOGGER.error("Consumer error: {}".format(receive_msg.error()))
Waleed Akbar's avatar
Waleed Akbar committed
                collector = json.loads(
                    receive_msg.value().decode('utf-8')
                )
                collector_id = receive_msg.key().decode('utf-8')
                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
Waleed Akbar's avatar
Waleed Akbar committed
                duration = collector.get('duration', -1)
                if duration == -1 and collector['interval'] == -1:
                    self.TerminateCollector(collector_id)
Waleed Akbar's avatar
Waleed Akbar committed
                    LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id))
                    if collector_id not in self.active_jobs:
                        stop_event = threading.Event()
                        self.active_jobs[collector_id] = stop_event
                        threading.Thread(target = self.CollectorHandler, 
                                    args=(
                                        collector_id, 
                                        collector['kpi_id'],
                                        duration, 
                                        collector['interval'],
                                        stop_event
                                    )).start()
                        # Stop the Collector after the given duration
                        if duration > 0:
                            def stop_after_duration():
                                time.sleep(duration)
                                LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}")
                                self.TerminateCollector(collector_id)
                                
                            duration_thread = threading.Thread(
                                target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}"
                                )
                            duration_thread.start()
                    else:
                        LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
                LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
Waleed Akbar's avatar
Waleed Akbar committed
    def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event):
Waleed Akbar's avatar
Waleed Akbar committed
        Method to handle collector request.
Waleed Akbar's avatar
Waleed Akbar committed
        end_points : list = self.get_endpoints_from_kpi_id(kpi_id)
        if not end_points:
            LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id))
        
        device_type : str = self.get_device_type_from_kpi_id(kpi_id)
        
        if device_type == "Unknown":
            LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id))

        if device_type == "EMU-Device":
            LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
            subscription = [collector_id, end_points, duration, interval]
            self.EmulatedCollectorHandler(subscription, kpi_id, stop_event)
        else:
            LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))


    def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event):
            # EmulatedCollector
            collector = EmulatedCollector(address="127.0.0.1", port=8000)
            collector.Connect()
            while not stop_event.is_set():
                # samples = collector.SubscribeState(subscription)
                # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples))
                # self.GenerateKpiValue(job_id, kpi_id, samples)
                LOGGER.info("Generating KPI Values ...")
                time.sleep(1)
    def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
Waleed Akbar's avatar
Waleed Akbar committed
        """
        Method to write kpi value on VALUE Kafka topic
Waleed Akbar's avatar
Waleed Akbar committed
        """
        producer = self.kafka_producer
        kpi_value : Dict = {
            "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
Waleed Akbar's avatar
Waleed Akbar committed
            "kpi_id"    : kpi_id,
            "kpi_value" : measured_kpi_value
Waleed Akbar's avatar
Waleed Akbar committed
        }
        producer.produce(
            KafkaTopic.VALUE.value,
Waleed Akbar's avatar
Waleed Akbar committed
            key      = collector_id,
            value    = json.dumps(kpi_value),
            callback = self.delivery_callback
        )
        producer.flush()

Waleed Akbar's avatar
Waleed Akbar committed
    def TerminateCollector(self, job_id):
        LOGGER.debug("Terminating collector backend...")
Waleed Akbar's avatar
Waleed Akbar committed
        try:
            if job_id not in self.active_jobs:  # not job_ids:
                # self.logger.warning(f"Active jobs: {self.active_jobs}") 
                self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
            else:
                LOGGER.info(f"Terminating job: {job_id}")
                stop_event = self.active_jobs.pop(job_id, None)
                if stop_event:
                    stop_event.set()
                    LOGGER.info(f"Job {job_id} terminated.")
                else:
                    LOGGER.warning(f"Job {job_id} not found in active jobs.")
        except:
            LOGGER.exception("Error terminating job: {:}".format(job_id))
Waleed Akbar's avatar
Waleed Akbar committed
    def get_endpoints_from_kpi_id(self, kpi_id: str) -> list:
Waleed Akbar's avatar
Waleed Akbar committed
        Method to get endpoints based on kpi_id.
Waleed Akbar's avatar
Waleed Akbar committed
        kpi_endpoints = {
            '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0",   "type": "ethernet", "sample_types": [101, 102]},
            '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1",   "type": "ethernet", "sample_types": []},
            '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper",   "sample_types": [101, 102, 201, 202]},
Waleed Akbar's avatar
Waleed Akbar committed
        return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else []

    def get_device_type_from_kpi_id(self, kpi_id: str) -> str:
        """
        Method to get device type based on kpi_id.
        """
        kpi_device_types = {
            "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"},
            "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"},
            "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"},
        }
        return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown")


    # def TerminateCollectorBackend(self, collector_id):
    #     LOGGER.debug("Terminating collector backend...")
    #     if collector_id in self.running_threads:
    #         thread = self.running_threads[collector_id]
    #         thread.stop()
    #         del self.running_threads[collector_id]
    #         LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id))
    #         self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
    #     else:
    #         LOGGER.warning('Backend collector {:} not found'.format(collector_id))

    # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
    #     """
    #     Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic
    #     """
    #     producer = self.kafka_producer
    #     kpi_value : Dict = {
    #         "kpi_id"    : kpi_id,
    #         "kpi_value" : measured_kpi_value,
    #     }
    #     producer.produce(
    #         KafkaTopic.TELEMETRY_RESPONSE.value,
    #         key      = collector_id,
    #         value    = json.dumps(kpi_value),
    #         callback = self.delivery_callback
    #     )
    #     producer.flush()
    def delivery_callback(self, err, msg):
        if err: 
            LOGGER.error('Message delivery failed: {:s}'.format(str(err)))