diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 508feecea1a9004268553654d26fcd38cd883b23..eab275324d9f2cf87b9cf839eeebf446faba1506 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import json import logging import threading @@ -26,11 +27,11 @@ from common.Settings import get_service_port_grpc from threading import Thread, Event from analytics.backend.service.Streamer import DaskStreamer from common.proto.analytics_frontend_pb2 import Analyzer -from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyticsBackendService(GenericGrpcService): """ @@ -40,9 +41,7 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) - self.schedular = BackgroundScheduler(daemon=True) - self.schedular.start() - self.running_threads = {} # To keep track of all running analyzers + self.active_streamers = {} self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) @@ -75,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService): try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: @@ -90,6 +89,9 @@ class AnalyticsBackendService(GenericGrpcService): """ Start the DaskStreamer with the given parameters. """ + if analyzer_uuid in self.active_streamers: + LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid)) + return False try: streamer = DaskStreamer( analyzer_uuid, @@ -99,13 +101,20 @@ class AnalyticsBackendService(GenericGrpcService): analyzer['batch_size' ], analyzer['window_size'], ) - self.schedular.add_job( - streamer.run, - 'date', - run_date=datetime.now(pytz.utc), - id=analyzer_uuid, - replace_existing=True - ) + streamer.start() + logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + + # Stop the streamer after the given duration + if analyzer['duration'] is not None: + def stop_after_duration(): + time.sleep(analyzer['duration']) + logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}") + streamer.stop() + + duration_thread = threading.Thread(target=stop_after_duration, daemon=True) + duration_thread.start() + + self.active_streamers[analyzer_uuid] = streamer LOGGER.info("Dask Streamer started.") return True except Exception as e: @@ -117,13 +126,15 @@ class AnalyticsBackendService(GenericGrpcService): Stop the DaskStreamer with the given analyzer_uuid. """ try: - active_jobs = self.schedular.get_jobs() - logger.debug("Active Jobs: {:}".format(active_jobs)) - if analyzer_uuid not in [job.id for job in active_jobs]: + if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False - self.schedular.remove_job(analyzer_uuid) - LOGGER.info("Dask Streamer stopped.") + LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}") + streamer = self.active_streamers[analyzer_uuid] + streamer.stop() + streamer.join() + del self.active_streamers[analyzer_uuid] + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.") return True except Exception as e: LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index f407de2a0deca61a76b459e4aa0a0a9eaab4f196..0e23bab69bd5e1136ca6c0ce9fee321632160bcd 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -119,9 +119,13 @@ def aggregation_handler( agg_df['kpi_id'] = output_kpi_list[kpi_index] + # if agg_df.empty: + # logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.") + # continue + # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") result = threshold_handler(key, agg_df, kpi_task_parameters) return result.to_dict(orient='records') else: - logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.") + logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") continue diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index cabed85880254462d6b9760e6d17336963fe6909..35d35b36e622724344a233da33f86a588b04aad0 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -16,20 +16,19 @@ import logging import time import json from confluent_kafka import KafkaException, KafkaError -# import pandas as pd from common.tools.kafka.Variables import KafkaTopic from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler from .AnalyzerHelper import AnalyzerHelper +import threading - -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') -class DaskStreamer: +class DaskStreamer(threading.Thread): def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, - window_size=None, n_workers=5, threads_per_worker=2): + window_size=None, n_workers=1, threads_per_worker=1): + super().__init__() self.key = key self.input_kpis = input_kpis self.output_kpis = output_kpis @@ -96,8 +95,6 @@ class DaskStreamer: logger.exception(f"Error in Dask streaming process: {e}") finally: logger.info(">>> Exiting Dask Streamer...") - self.cleanup() - logger.info(">>> Dask Streamer Cleanup Completed.") def task_handler_selector(self): """Select the task handler based on the task type.""" @@ -126,7 +123,7 @@ class DaskStreamer: self.producer.flush() logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") - def cleanup(self): + def stop(self): """Clean up Kafka and Dask resources.""" logger.info("Shutting down resources...") self.running = False @@ -144,16 +141,18 @@ class DaskStreamer: except Exception as e: logger.error(f"Error closing Kafka producer: {e}") - if self.client and hasattr(self.client, 'status') and self.client.status == 'running': + if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running': try: self.client.close() logger.info("Dask client closed.") except Exception as e: logger.error(f"Error closing Dask client: {e}") - if self.cluster and hasattr(self.cluster, 'close'): + if self.cluster is not None and hasattr(self.cluster, 'close'): try: self.cluster.close(timeout=5) logger.info("Dask cluster closed.") except Exception as e: - logger.error(f"May be timeout. Error closing Dask cluster: {e}") + logger.error(f"Timeout error while closing Dask cluster: {e}") + + diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 040fbb4686aa9e1fd36de5947653c51c3373bb17..6a303d474071928456b7f152f78ec20e25bd9ff3 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -32,7 +32,7 @@ def get_thresholds(): } def get_duration(): - return 30 + return 60 def get_windows_size(): return None