diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 4784ef051dfe8853d42792d4163f33af0ae6fd8c..f9fcf47ec6c704308d49ff03483a44e096c68320 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -29,13 +29,14 @@ class AnalyticsBackendService(GenericGrpcService): Class listens for ... """ def __init__(self, cls_name : str = __name__) -> None: + self.running_threads = {} # To keep track of all running analyzers self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def RunSparkStreamer(self, analyzer): + def RunSparkStreamer(self, analyzer_id, analyzer): kpi_list = analyzer['input_kpis'] - oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] + oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... thresholds = analyzer['thresholds'] window_size = analyzer['window_size'] window_slider = analyzer['window_slider'] @@ -43,10 +44,19 @@ class AnalyticsBackendService(GenericGrpcService): kpi_list, oper_list, thresholds, window_size, window_slider)) LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( kpi_list, oper_list, thresholds, window_size, window_slider)) - threading.Thread(target=SparkStreamer, - args=(kpi_list, oper_list, thresholds, window_size, window_slider, None) - ).start() - return True + try: + stop_event = threading.Event() + thread = threading.Thread(target=SparkStreamer, + args=(kpi_list, oper_list, thresholds, window_size, window_slider, None, + stop_event)) + self.running_threads[analyzer_id] = (thread, stop_event) + # thread.start() + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False def RunRequestListener(self)->bool: threading.Thread(target=self.RequestListener).start() @@ -69,8 +79,30 @@ class AnalyticsBackendService(GenericGrpcService): print("Consumer error: {}".format(receive_msg.error())) break analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer)) - print('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer)) - # TODO: Add active analyzer to list - self.RunSparkStreamer(analyzer) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.TerminateAnalyzerBackend(analyzer_uuid) + else: + self.RunSparkStreamer(analyzer_uuid, analyzer) + + def TerminateAnalyzerBackend(self, analyzer_uuid): + if analyzer_uuid in self.running_threads: + try: + thread, stop_event = self.running_threads[analyzer_uuid] + stop_event.set() + thread.join() + del self.running_threads[analyzer_uuid] + print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + return True + except Exception as e: + LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e)) + return False + else: + print ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + # generate confirmation towards frontend + diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 2671bfb13532ac547d615755d2d8cce54e1859d1..9c438761c295405a4f4459700aa3056b2f0a1ecc 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -94,21 +94,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.db_obj.delete_db_row_by_id( AnalyzerModel, "analyzer_id", analyzer_id_to_delete ) + self.PublishStopRequestOnKafka(analyzer_id_to_delete) except Exception as e: - LOGGER.warning('Unable to delete analyzer. Error: {:}'.format(e)) - self.PublishStopRequestOnKafka(request) + LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e)) return Empty() - def PublishStopRequestOnKafka(self, analyzer_id): + def PublishStopRequestOnKafka(self, analyzer_uuid): """ Method to generate stop analyzer request on Kafka. """ - analyzer_uuid = analyzer_id.analyzer_id.uuid + # analyzer_uuid = analyzer_id.analyzer_id.uuid analyzer_to_stop : Dict = { - "algo_name" : -1, + "algo_name" : None, "input_kpis" : [], "output_kpis" : [], - "oper_mode" : -1 + "oper_mode" : None } self.kafka_producer.produce( KafkaTopic.ANALYTICS_REQUEST.value,