# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import time import logging import threading from typing import Any, Dict from datetime import datetime, timezone from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) super().__init__(port, cls_name=cls_name) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) self.collector = EmulatedCollector(address="127.0.0.1", port=8000) self.active_jobs = {} def install_servicers(self): threading.Thread(target=self.RequestListener).start() def RequestListener(self): """ listener for requests on Kafka topic. """ LOGGER.info('Telemetry backend request listener is running ...') # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: receive_msg = consumer.poll(1.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads( receive_msg.value().decode('utf-8') ) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) duration = collector.get('duration', 0) if duration == -1 and collector['interval'] == -1: self.TerminateCollector(collector_id) else: LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id)) if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event threading.Thread(target = self.CollectorHandler, args=( collector_id, collector['kpi_id'], duration, collector['interval'], stop_event )).start() # Stop the Collector after the given duration if duration > 0: def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") self.TerminateCollector(collector_id) duration_thread = threading.Thread( target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", args=(duration, stop_event) ) duration_thread.start() else: LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): """ Method to handle collector request. """ end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) if not end_points: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) device_type : str = self.get_device_type_from_kpi_id(kpi_id) if device_type == "Unknown": LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id)) if device_type == "EMU-Device": LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) else: LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): # EmulatedCollector self.collector.Connect() if not self.collector.SubscribeState(subscription): LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) else: while not stop_event.is_set(): samples = list(self.collector.GetState(duration=duration, blocking=True)) LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) self.GenerateKpiValue(collector_id, kpi_id, samples) time.sleep(1) self.collector.Disconnect() # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi value on VALUE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( KafkaTopic.VALUE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") try: if job_id not in self.active_jobs: # not job_ids: # self.logger.warning(f"Active jobs: {self.active_jobs}") self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") else: LOGGER.info(f"Terminating job: {job_id}") stop_event = self.active_jobs.pop(job_id, None) if stop_event: stop_event.set() LOGGER.info(f"Job {job_id} terminated.") if self.collector.UnsubscribeState(job_id): LOGGER.info(f"Unsubscribed from collector: {job_id}") else: LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}") else: LOGGER.warning(f"Job {job_id} not found in active jobs.") except: LOGGER.exception("Error terminating job: {:}".format(job_id)) # --- Mock Methods --- def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict: """ Method to get endpoints based on kpi_id. """ kpi_endpoints = { '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]}, '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, } return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {} def get_device_type_from_kpi_id(self, kpi_id: str) -> str: """ Method to get device type based on kpi_id. """ kpi_device_types = { "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"}, "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"}, "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"}, } return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err)))