diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f676b2c5085ff3713c637efffa8d0dcfe7a37f39..10f1f75ea6e571548492272862890962ce5be9cd 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -104,11 +104,12 @@ class AnalyticsBackendService(GenericGrpcService): try: streamer = DaskStreamer( key = analyzer_uuid, - input_kpis = analyzer['input_kpis' ], - output_kpis = analyzer['output_kpis'], - thresholds = analyzer['thresholds' ], - batch_size = analyzer['batch_size' ], - window_size = analyzer['window_size'], + input_kpis = analyzer['input_kpis' ], + output_kpis = analyzer['output_kpis' ], + thresholds = analyzer['thresholds' ], + batch_size = analyzer['batch_size_min' ], + batch_duration = analyzer['batch_duration_min'], + window_size = analyzer['window_size' ], cluster_instance = self.cluster, producer_instance = self.central_producer, ) @@ -119,7 +120,7 @@ class AnalyticsBackendService(GenericGrpcService): if analyzer['duration'] > 0: def stop_after_duration(): time.sleep(analyzer['duration']) - LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}") + LOGGER.warning(f"Execution duration ({analyzer['duration']}) completed of Analyzer: {analyzer_uuid}") if not self.StopStreamer(analyzer_uuid): LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index a05b1a0b772d1d6910e7487c8260fba894aba7e0..256530ba78329f09327b551f0238f4dd8a1258b5 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -95,6 +95,8 @@ def aggregation_handler( "sum" : ('kpi_value', 'sum'), } + results = [] + # Process each KPI-specific task parameter for kpi_index, kpi_id in enumerate(input_kpi_list): @@ -122,10 +124,13 @@ def aggregation_handler( agg_df['kpi_id'] = output_kpi_list[kpi_index] # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") - result = threshold_handler(key, agg_df, kpi_task_parameters) + record = threshold_handler(key, agg_df, kpi_task_parameters) - return result.to_dict(orient='records') + results.extend(record.to_dict(orient='records')) else: logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") continue - return [] + if results: + return results + else: + return [] diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index c72359db2e5de7f9165442deb9d52bfde415b289..e1eaffc49b43c7849f4a1d399d22246dc05609fd 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -29,19 +29,21 @@ logger = logging.getLogger(__name__) class DaskStreamer(threading.Thread): def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size = 5, + batch_duration = None, window_size = None, cluster_instance = None, producer_instance = AnalyzerHelper.initialize_kafka_producer() ): super().__init__() - self.key = key - self.input_kpis = input_kpis - self.output_kpis = output_kpis - self.thresholds = thresholds - self.window_size = window_size - self.batch_size = batch_size - self.running = True - self.batch = [] + self.key = key + self.input_kpis = input_kpis + self.output_kpis = output_kpis + self.thresholds = thresholds + self.window_size = window_size # TODO: Not implemented + self.batch_size = batch_size + self.batch_duration = batch_duration + self.running = True + self.batch = [] # Initialize Kafka and Dask components self.client = AnalyzerHelper.initialize_dask_client(cluster_instance) @@ -65,7 +67,7 @@ class DaskStreamer(threading.Thread): if not self.client: logger.warning("Dask client is not running. Exiting loop.") break - message = self.consumer.poll(timeout=2.0) + message = self.consumer.poll(timeout=1.0) if message is None: # logger.info("No new messages received.") continue @@ -83,7 +85,7 @@ class DaskStreamer(threading.Thread): self.batch.append(value) # Window size has a precedence over batch size - if self.window_size is None: + if self.batch_duration is None: if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size logger.info(f"Processing based on batch size {self.batch_size}.") self.task_handler_selector() @@ -91,8 +93,8 @@ class DaskStreamer(threading.Thread): else: # Process based on window size current_time = time.time() - if (current_time - last_batch_time) >= self.window_size and self.batch: - logger.info(f"Processing based on window size {self.window_size}.") + if (current_time - last_batch_time) >= self.batch_duration and self.batch: + logger.info(f"Processing based on window size {self.batch_duration}.") self.task_handler_selector() self.batch = [] last_batch_time = current_time diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 4a119d948864d74fc650983e96406fedd3b73e6a..11210ded12cc993041c8f872b147213f70eb67a0 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -32,13 +32,13 @@ def get_thresholds(): } def get_duration(): - return 40 + return 90 def get_windows_size(): return None def get_batch_size(): - return 10 + return 5 def get_interval(): return 5