# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import queue import json import time import logging import threading from typing import Any, Dict from datetime import datetime, timezone from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from numpy import info from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService from telemetry.backend.service.EmulatedCollector import NetworkMetricsEmulator LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) super().__init__(port, cls_name=cls_name) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) self.running_threads = {} self.emulatorCollector = None self.metric_queue = queue.Queue() def install_servicers(self): threading.Thread(target=self.RequestListener).start() def RequestListener(self): """ listener for requests on Kafka topic. """ LOGGER.info('Telemetry backend request listener is running ...') # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) else: threading.Thread(target=self.InitiateCollectorBackend, args=(collector_id, collector)).start() except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) def InitiateCollectorBackend(self, collector_id, collector): """ Method receives collector request and initiates collecter backend. """ # print("Initiating backend for collector: ", collector_id) LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id))) start_time = time.time() self.emulatorCollector = NetworkMetricsEmulator( duration = collector['duration'], interval = collector['interval'], metric_queue = self.metric_queue ) self.emulatorCollector.start() self.running_threads[collector_id] = self.emulatorCollector while self.emulatorCollector.is_alive(): if not self.metric_queue.empty(): metric_value = self.metric_queue.get() LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) self.GenerateCollectorResponse(collector_id, collector['kpi_id'] , metric_value) time.sleep(1) self.TerminateCollectorBackend(collector_id) def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi value on TELEMETRY_RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } producer.produce( KafkaTopic.VALUE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() def TerminateCollectorBackend(self, collector_id): LOGGER.debug("Terminating collector backend...") if collector_id in self.running_threads: thread = self.running_threads[collector_id] thread.stop() del self.running_threads[collector_id] LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: LOGGER.warning('Backend collector {:} not found'.format(collector_id)) def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value, } producer.produce( KafkaTopic.TELEMETRY_RESPONSE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) # print(f'Message delivery failed: {err}') # else: # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) # print(f'Message delivered to topic {msg.topic()}')