From 88caa1cf6f141b8409c258fe685f192c646a9dea Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Sat, 7 Sep 2024 14:20:49 +0000 Subject: [PATCH] Changes in Analytics **Backend:** - Added a dictionary to manage running analyzers. - Implemented logic to manage running analyzers. - Added the `TerminateAnalyzerBackend` method to handle analyzer termination. **Frontend:** - Modified and invoked the `PublishStopRequestOnKafka` method. --- .../service/AnalyticsBackendService.py | 54 +++++++++++++++---- .../AnalyticsFrontendServiceServicerImpl.py | 12 ++--- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 4784ef051..f9fcf47ec 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -29,13 +29,14 @@ class AnalyticsBackendService(GenericGrpcService): Class listens for ... """ def __init__(self, cls_name : str = __name__) -> None: + self.running_threads = {} # To keep track of all running analyzers self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def RunSparkStreamer(self, analyzer): + def RunSparkStreamer(self, analyzer_id, analyzer): kpi_list = analyzer['input_kpis'] - oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] + oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... thresholds = analyzer['thresholds'] window_size = analyzer['window_size'] window_slider = analyzer['window_slider'] @@ -43,10 +44,19 @@ class AnalyticsBackendService(GenericGrpcService): kpi_list, oper_list, thresholds, window_size, window_slider)) LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( kpi_list, oper_list, thresholds, window_size, window_slider)) - threading.Thread(target=SparkStreamer, - args=(kpi_list, oper_list, thresholds, window_size, window_slider, None) - ).start() - return True + try: + stop_event = threading.Event() + thread = threading.Thread(target=SparkStreamer, + args=(kpi_list, oper_list, thresholds, window_size, window_slider, None, + stop_event)) + self.running_threads[analyzer_id] = (thread, stop_event) + # thread.start() + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False def RunRequestListener(self)->bool: threading.Thread(target=self.RequestListener).start() @@ -69,8 +79,30 @@ class AnalyticsBackendService(GenericGrpcService): print("Consumer error: {}".format(receive_msg.error())) break analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer)) - print('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer)) - # TODO: Add active analyzer to list - self.RunSparkStreamer(analyzer) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.TerminateAnalyzerBackend(analyzer_uuid) + else: + self.RunSparkStreamer(analyzer_uuid, analyzer) + + def TerminateAnalyzerBackend(self, analyzer_uuid): + if analyzer_uuid in self.running_threads: + try: + thread, stop_event = self.running_threads[analyzer_uuid] + stop_event.set() + thread.join() + del self.running_threads[analyzer_uuid] + print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + return True + except Exception as e: + LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e)) + return False + else: + print ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + # generate confirmation towards frontend + diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 2671bfb13..9c438761c 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -94,21 +94,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.db_obj.delete_db_row_by_id( AnalyzerModel, "analyzer_id", analyzer_id_to_delete ) + self.PublishStopRequestOnKafka(analyzer_id_to_delete) except Exception as e: - LOGGER.warning('Unable to delete analyzer. Error: {:}'.format(e)) - self.PublishStopRequestOnKafka(request) + LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e)) return Empty() - def PublishStopRequestOnKafka(self, analyzer_id): + def PublishStopRequestOnKafka(self, analyzer_uuid): """ Method to generate stop analyzer request on Kafka. """ - analyzer_uuid = analyzer_id.analyzer_id.uuid + # analyzer_uuid = analyzer_id.analyzer_id.uuid analyzer_to_stop : Dict = { - "algo_name" : -1, + "algo_name" : None, "input_kpis" : [], "output_kpis" : [], - "oper_mode" : -1 + "oper_mode" : None } self.kafka_producer.produce( KafkaTopic.ANALYTICS_REQUEST.value, -- GitLab