# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import json import logging import threading from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from analytics.backend.service.Streamer import DaskStreamer from analytics.backend.service.AnalyzerHelper import AnalyzerHelper LOGGER = logging.getLogger(__name__) class AnalyticsBackendService(GenericGrpcService): """ AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService. It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly. It also initializes the Kafka producer and Dask cluster for the streamer. """ def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1 ) -> None: LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) self.active_streamers = {} self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer self.cluster = AnalyzerHelper.initialize_dask_cluster( n_workers, threads_per_worker) # Local cluster self.request_consumer = Consumer({ 'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-backend', 'auto.offset.reset' : 'latest', }) def install_servicers(self): threading.Thread( target=self.RequestListener, args=() ).start() def RequestListener(self): """ listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") # print ("Request Listener is initiated ...") consumer = self.request_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if self.StopStreamer(analyzer_uuid): LOGGER.info("Dask Streamer stopped.") else: LOGGER.error("Failed to stop Dask Streamer.") else: if self.StartStreamer(analyzer_uuid, analyzer): LOGGER.info("Dask Streamer started.") else: LOGGER.error("Failed to start Dask Streamer.") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) def StartStreamer(self, analyzer_uuid : str, analyzer : dict): """ Start the DaskStreamer with the given parameters. """ if analyzer_uuid in self.active_streamers: LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False try: streamer = DaskStreamer( key = analyzer_uuid, input_kpis = analyzer['input_kpis' ], output_kpis = analyzer['output_kpis' ], thresholds = analyzer['thresholds' ], batch_size = analyzer['batch_size_min' ], batch_duration = analyzer['batch_duration_min'], window_size = analyzer['window_size' ], cluster_instance = self.cluster, producer_instance = self.central_producer, ) streamer.start() LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") # Stop the streamer after the given duration if analyzer['duration'] > 0: def stop_after_duration(): time.sleep(analyzer['duration']) LOGGER.warning(f"Execution duration ({analyzer['duration']}) completed of Analyzer: {analyzer_uuid}") if not self.StopStreamer(analyzer_uuid): LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") duration_thread = threading.Thread(target=stop_after_duration, daemon=True) duration_thread.start() self.active_streamers[analyzer_uuid] = streamer return True except Exception as e: LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e)) return False def StopStreamer(self, analyzer_uuid : str): """ Stop the DaskStreamer with the given analyzer_uuid. """ try: if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}") streamer = self.active_streamers[analyzer_uuid] streamer.stop() streamer.join() del self.active_streamers[analyzer_uuid] LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.") return True except Exception as e: LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) return False def close(self): """ Close the producer and cluster cleanly. """ if self.central_producer: try: self.central_producer.flush() LOGGER.info("Kafka producer flushed and closed.") except: LOGGER.exception("Error closing Kafka producer") if self.cluster: try: self.cluster.close() LOGGER.info("Dask cluster closed.") except: LOGGER.exception("Error closing Dask cluster") def stop(self): self.close() return super().stop()