From dca3cbefb5cc7fa1ddea2c4ad9a87ec52295cfe9 Mon Sep 17 00:00:00 2001 From: "Georgios P. Katsikas" Date: Tue, 24 Feb 2026 12:44:56 +0000 Subject: [PATCH 1/3] fix: analytics handlers invoked correctly --- proto/analytics_frontend.proto | 2 +- .../service/AnalyticsBackendService.py | 43 ++-- .../backend/service/AnalyzerHandlers.py | 207 ++++++++++-------- src/analytics/backend/service/Streamer.py | 55 ++++- src/common/tools/kafka/Variables.py | 8 +- .../collectors/int_collector/INTCollector.py | 4 +- 6 files changed, 186 insertions(+), 133 deletions(-) diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index 5a3980ff5..b1bdfac0a 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -39,7 +39,7 @@ enum AnalyzerOperationMode { message Analyzer { AnalyzerId analyzer_id = 1; string algorithm_name = 2; // The algorithm to be executed - float duration_s = 3; // Termiate the data analytics thread after duration (seconds); 0 = infinity time + float duration_s = 3; // Terminate the data analytics thread after duration (seconds); 0 = infinity time repeated kpi_manager.KpiId input_kpi_ids = 4; // The KPI Ids to be processed by the analyzer repeated kpi_manager.KpiId output_kpi_ids = 5; // The KPI Ids produced by the analyzer AnalyzerOperationMode operation_mode = 6; // Operation mode of the analyzer diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index e0c0b0d2b..6d4f64bef 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -26,7 +26,6 @@ from common.Settings import get_service_port_grpc from analytics.backend.service.Streamer import DaskStreamer from analytics.backend.service.AnalyzerHelper import AnalyzerHelper - LOGGER = logging.getLogger(__name__) class AnalyticsBackendService(GenericGrpcService): @@ -69,29 +68,29 @@ class AnalyticsBackendService(GenericGrpcService): if message is None: continue elif message.error(): - if message.error().code() == KafkaError._PARTITION_EOF: - LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") - break - elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") - continue - elif message.error(): - raise KafkaException(message.error()) + if message.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + break + elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") + continue + elif message.error(): + raise KafkaException(message.error()) try: - analyzer = json.loads(message.value().decode('utf-8')) + analyzer = json.loads(message.value().decode('utf-8')) analyzer_uuid = message.key().decode('utf-8') - LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + LOGGER.info('Received Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: if self.StopStreamer(analyzer_uuid): - LOGGER.info("Dask Streamer stopped.") + LOGGER.info("Dask Streamer stopped") else: LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...") else: if self.StartStreamer(analyzer_uuid, analyzer): - LOGGER.info("Dask Streamer started.") + LOGGER.info("Dask Streamer started") else: - LOGGER.warning("Failed to start Dask Streamer.") + LOGGER.warning("Failed to start Dask Streamer") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) @@ -103,6 +102,7 @@ class AnalyticsBackendService(GenericGrpcService): if analyzer_uuid in self.active_streamers: LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False + LOGGER.info(f"Start Streamer for Analyzer:\n{analyzer}") try: streamer = DaskStreamer( key = analyzer_uuid, @@ -116,7 +116,7 @@ class AnalyticsBackendService(GenericGrpcService): producer_instance = self.central_producer, ) streamer.start() - LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + LOGGER.info(f"Streamer started with analyzer ID: {analyzer_uuid}") # Stop the streamer after the given duration duration = analyzer['duration'] @@ -125,11 +125,10 @@ class AnalyticsBackendService(GenericGrpcService): time.sleep(duration) LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}") if not self.StopStreamer(analyzer_uuid): - LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") + LOGGER.warning("Failed to stop Dask Streamer. Streamer may already be terminated") duration_thread = threading.Thread( - target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}" - ) + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}") duration_thread.start() self.active_streamers[analyzer_uuid] = streamer @@ -151,10 +150,10 @@ class AnalyticsBackendService(GenericGrpcService): streamer.stop() streamer.join() del self.active_streamers[analyzer_uuid] - LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.") + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been successfully terminated") return True except: - LOGGER.exception("Failed to stop Dask Streamer.") + LOGGER.exception("Failed to stop Dask Streamer") return False def close(self): @@ -164,13 +163,13 @@ class AnalyticsBackendService(GenericGrpcService): if self.central_producer: try: self.central_producer.flush() - LOGGER.info("Kafka producer flushed and closed.") + LOGGER.info("Kafka producer flushed and closed") except: LOGGER.exception("Error closing Kafka producer") if self.cluster: try: self.cluster.close() - LOGGER.info("Dask cluster closed.") + LOGGER.info("Dask cluster closed") except: LOGGER.exception("Error closing Dask cluster") diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 894f37423..03e5260a0 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -13,16 +13,15 @@ # limitations under the License. import logging -from enum import Enum import pandas as pd +from enum import Enum from collections import defaultdict logger = logging.getLogger(__name__) - class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" - AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne" + AGGREGATION_HANDLER_MANY_TO_ONE = "AggregationHandlerManyToOne" UNSUPPORTED_HANDLER = "UnsupportedHandler" @classmethod @@ -30,23 +29,30 @@ class Handlers(Enum): return handler_name in cls._value2member_map_ def select_handler(handler_name): - if handler_name == "AggregationHandler": - return aggregation_handler - elif handler_name == "AggregationHandlerThreeToOne": - return aggregation_handler_three_to_one - else: - return "UnsupportedHandler" + try: + logger.info(f"Aggregation handler: {handler_name}") + handler_enum = Handlers(handler_name) # auto-validates + return HANDLER_FUNCTIONS[handler_enum] + except (ValueError, KeyError): + logger.error("Unsupported handler") + raise ValueError(f"Unsupported handler: {handler_name}") + +def transform_data(record : pd.DataFrame, value_key : str) -> pd.DataFrame: + new_value_key = 'value' + return record.rename(columns={value_key: new_value_key}) + +def find(data, type, value): + return next((item for item in data if item[type] == value), None) -# This method is top-level and should not be part of the class due to serialization issues. def threshold_handler(key, aggregated_df, thresholds): """ - Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + Apply thresholds (value_threshold_low and value_threshold_high) based on the thresholds dictionary on the aggregated DataFrame. Args: key (str): Key for the aggregated DataFrame. aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. - thresholds (dict): Thresholds dictionary with keys in the format '' and values as (fail_th, raise_th). + thresholds (dict): Thresholds dictionary with keys in the format '' and values as (value_threshold_low, value_threshold_high). Returns: pd.DataFrame: DataFrame with additional threshold columns. @@ -56,105 +62,123 @@ def threshold_handler(key, aggregated_df, thresholds): if metric_name not in aggregated_df.columns: logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.") continue - + + logger.info(f"[Threshold] Metric: {metric_name}") + logger.info(f"[Threshold] Value range: {threshold_values}") + # Ensure the threshold values are valid (check for tuple specifically) if isinstance(threshold_values, list) and len(threshold_values) == 2: - fail_th, raise_th = threshold_values + fall_th, raise_th = threshold_values # Add threshold columns with updated naming - aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th - aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th + aggregated_df[f"value_threshold_low"] = aggregated_df[metric_name] < fall_th + aggregated_df[f"value_threshold_high"] = aggregated_df[metric_name] > raise_th else: logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.") + logger.info(f"[AggregatedDF]: {aggregated_df}") return aggregated_df -def aggregation_handler( - batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds - ): +def aggregation_handler(key, batch, input_kpi_list, output_kpi_list, thresholds): """ Process a batch of data and calculate aggregated values for each input KPI - and maps them to the output KPIs. """ + and maps them to the output KPIs. + """ - logger.info(f"({batch_type_name}) Processing batch for key: {key}") + logger.info("AggregationHandler starts") if not batch: - logger.info("Empty batch received. Skipping processing.") + logger.warning("Empty batch received. Skipping processing") + return [] + + logger.info(f"Processing {len(batch)} records for key: {key}") + + # Convert data into a DataFrame + df = pd.DataFrame(batch) + + # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) + df = df[df['kpi_id'].isin(input_kpi_list)].copy() + + if df.empty: + logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing") return [] - else: - logger.info(f" >>>>> Processing {len(batch)} records for key: {key}") - - # Convert data into a DataFrame - df = pd.DataFrame(batch) - - # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) - df = df[df['kpi_id'].isin(input_kpi_list)].copy() - - if df.empty: - logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.") - return [] - - # Define all possible aggregation methods - aggregation_methods = { - "min" : ('kpi_value', 'min'), - "max" : ('kpi_value', 'max'), - "avg" : ('kpi_value', 'mean'), - "first" : ('kpi_value', lambda x: x.iloc[0]), - "last" : ('kpi_value', lambda x: x.iloc[-1]), - "variance": ('kpi_value', 'var'), - "count" : ('kpi_value', 'count'), - "range" : ('kpi_value', lambda x: x.max() - x.min()), - "sum" : ('kpi_value', 'sum'), - } - - results = [] - - # Process each KPI-specific task parameter - for kpi_index, kpi_id in enumerate(input_kpi_list): - - # logger.info(f"1.Processing KPI: {kpi_id}") - kpi_task_parameters = thresholds["task_parameter"][kpi_index] - - # Get valid task parameters for this KPI - valid_task_parameters = [ - method for method in kpi_task_parameters.keys() - if method in aggregation_methods - ] - # Select the aggregation methods based on valid task parameters - selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} + # Define all possible aggregation methods + aggregation_methods = { + "min" : ('kpi_value', 'min'), + "max" : ('kpi_value', 'max'), + "avg" : ('kpi_value', 'mean'), + "first" : ('kpi_value', lambda x: x.iloc[0]), + "last" : ('kpi_value', lambda x: x.iloc[-1]), + "variance": ('kpi_value', 'var'), + "count" : ('kpi_value', 'count'), + "range" : ('kpi_value', lambda x: x.max() - x.min()), + "sum" : ('kpi_value', 'sum'), + } + + results = [] + + # Process each KPI-specific task parameter + for kpi_index, kpi_id in enumerate(input_kpi_list): + logger.debug(f"Processing KPI: {kpi_id}") + kpi_task_parameters = thresholds["task_parameter"][kpi_index] + logger.debug(f"KPI task parameters: {kpi_task_parameters}") - # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") - kpi_df = df[df['kpi_id'] == kpi_id] + # Get valid task parameters for this KPI + valid_task_parameters = [ + method for method in kpi_task_parameters.keys() + if method in aggregation_methods + ] - # Check if kpi_df is not empty before applying the aggregation methods - if not kpi_df.empty: - agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index() + # Select the aggregation methods based on valid task parameters + selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} + logger.debug(f"Processing methods: {selected_methods}") - # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}") + kpi_df = df[df['kpi_id'] == kpi_id] + logger.debug(f"KPI data frame:\n{kpi_df}") - agg_df['kpi_id'] = output_kpi_list[kpi_index] + # Check if kpi_df is not empty before applying the aggregation methods + if not kpi_df.empty: + agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index() + logger.debug(f"Aggregated DataFrame for KPI {kpi_id}:\n{agg_df}") - # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") - record = threshold_handler(key, agg_df, kpi_task_parameters) + agg_df['kpi_id'] = output_kpi_list[kpi_index] + record = threshold_handler(key, agg_df, kpi_task_parameters) - results.extend(record.to_dict(orient='records')) - else: - logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") - continue - if results: - return results + # Make the data frame agnostic to the aggregation method + value_key = list(selected_methods.keys())[0] + upd_record = transform_data(record, value_key) + + # Store the record + results.extend(upd_record.to_dict(orient='records')) else: - return [] + logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation") + continue -def find(data , type , value): - return next((item for item in data if item[type] == value), None) + if results: + logger.info(f"Aggregation result: {results}") + return results + else: + return [] + +def aggregation_handler_many_to_one(key, batch, input_kpi_list, output_kpi_list, thresholds): + logger.info("AggregationHandlerManyToOne starts") + if not batch: + logger.warning("Empty batch received. Skipping processing.") + return [] + logger.info(f"Processing {len(batch)} records for key: {key}") -def aggregation_handler_three_to_one( - batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds -): + kpi_task_parameters = None + for kpi_index, kpi_id in enumerate(input_kpi_list): + logger.debug(f"Processing KPI: {kpi_id}") + kpi_task_parameters = thresholds["task_parameter"][kpi_index] + logger.debug(f"KPI task parameters: {kpi_task_parameters}") + + threshold_high, threshold_low = None, None + for _, threshold_values in kpi_task_parameters.items(): + if isinstance(threshold_values, list) and len(threshold_values) == 2: + threshold_low, threshold_high = threshold_values # Group and sum - # Track sum and count sum_dict = defaultdict(int) count_dict = defaultdict(int) @@ -173,13 +197,18 @@ def aggregation_handler_three_to_one( result = { "kpi_id": output_kpi_list[0], - "avg": total_kpi_metric, - "THRESHOLD_RAISE": bool(total_kpi_metric > 2600), - "THRESHOLD_FALL": bool(total_kpi_metric < 699) + "value": total_kpi_metric, + "value_threshold_high": bool(total_kpi_metric > threshold_high), + "value_threshold_low": bool(total_kpi_metric < threshold_low) } results = [] results.append(result) - logger.warning(f"result : {result}.") + logger.info(f"Aggregation result: {result}") return results + +HANDLER_FUNCTIONS = { + Handlers.AGGREGATION_HANDLER: aggregation_handler, + Handlers.AGGREGATION_HANDLER_MANY_TO_ONE: aggregation_handler_many_to_one +} diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index ff0b10ef5..1eba74976 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -19,13 +19,11 @@ import logging from confluent_kafka import KafkaException, KafkaError from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler +from analytics.backend.service.AnalyzerHandlers import Handlers, select_handler, aggregation_handler from analytics.backend.service.AnalyzerHelper import AnalyzerHelper - logger = logging.getLogger(__name__) - class DaskStreamer(threading.Thread): def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size = 5, @@ -45,6 +43,14 @@ class DaskStreamer(threading.Thread): self.running = True self.batch = [] + logger.info(f"Dask Streamer key: {self.key}") + logger.info(f"Dask Streamer input KPIs: {self.input_kpis}") + logger.info(f"Dask Streamer output KPIs: {self.output_kpis}") + logger.info(f"Dask Streamer thresholds: {self.thresholds}") + logger.info(f"Dask Streamer window size: {self.window_size}") + logger.info(f"Dask Streamer batch size: {self.batch_size}") + logger.info(f"Dask Streamer batch duration: {self.batch_duration}") + # Initialize Kafka and Dask components self.client = AnalyzerHelper.initialize_dask_client(cluster_instance) self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer @@ -55,7 +61,7 @@ class DaskStreamer(threading.Thread): def run(self): """Main method to start the DaskStreamer.""" try: - logger.info("Starting Dask Streamer") + logger.info("Dask Streamer started") last_batch_time = time.time() while True: if not self.consumer: @@ -69,7 +75,7 @@ class DaskStreamer(threading.Thread): break message = self.consumer.poll(timeout=1.0) if message is None: - # logger.info("No new messages received.") + logger.debug("No new messages received.") continue if message.error(): if message.error().code() == KafkaError._PARTITION_EOF: @@ -85,7 +91,12 @@ class DaskStreamer(threading.Thread): except json.JSONDecodeError: logger.error(f"Failed to decode message: {message.value()}") continue - self.batch.append(value) + # This streamer is only meant to serve a list of input KPIs + if value["kpi_id"] in self.input_kpis: + self.batch.append(value) + # Ignore the rest.. + else: + continue # Window size has a precedence over batch size if self.batch_duration is None: @@ -106,18 +117,34 @@ class DaskStreamer(threading.Thread): logger.exception(f"Error in Dask streaming process: {e}") finally: self.stop() - logger.info(">>> Exiting Dask Streamer...") + logger.info("Exiting Dask Streamer...") def task_handler_selector(self): """Select the task handler based on the task type.""" logger.info(f"Batch to be processed: {self.batch}") - if Handlers.is_valid_handler(self.thresholds["task_type"]): + handler_name = self.thresholds["task_type"] + if Handlers.is_valid_handler(handler_name): if self.client is not None and self.client.status == 'running': + logger.info(f"Selecting the handler for key {self.key}") + logger.info(f"|--> Input KPIs {self.input_kpis}") + logger.info(f"|--> Output KPIs {self.output_kpis}") + logger.info(f"|--> Thresholds {self.thresholds}") try: - future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size", - self.key, - self.batch, self.input_kpis, self.output_kpis, self.thresholds) - future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + handler_fn = select_handler(handler_name) + future = self.client.submit( + handler_fn, + self.key, + self.batch, + self.input_kpis, + self.output_kpis, + self.thresholds + ) + logger.info(f"|--> Handler result {future.result()}") + future.add_done_callback( + lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value) + ) + # result = aggregation_handler(self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) + # self.produce_result(result, KafkaTopic.ALARMS.value) except Exception as e: logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") @@ -132,6 +159,10 @@ class DaskStreamer(threading.Thread): logger.warning("Nothing to produce. Skipping.") return for record in result: + # Filter out records not related with the output KPI of interest + if record["kpi_id"] not in self.output_kpis: + continue + logger.info(f"Kafka Alarm - Record: {record}") try: self.producer.produce( destination_topic, diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 5de78ef23..5a8e68215 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -18,13 +18,11 @@ from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import TopicAlreadyExistsError from common.Settings import get_setting - LOGGER = logging.getLogger(__name__) -KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-public.{:s}.svc.cluster.local:{:s}' +KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' KAFKA_TOPIC_NUM_PARTITIONS = 1 KAFKA_TOPIC_REPLICATION_FACTOR = 1 -#KAFKA_TOPIC_LIST_TIMEOUT = 5 KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT = 60_000 # ms KAFKA_TOPIC_CREATE_WAIT_ITERATIONS = 10 KAFKA_TOPIC_CREATE_WAIT_TIME = 1 @@ -69,8 +67,6 @@ class KafkaTopic(Enum): LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address()))) kafka_admin_client = KafkaConfig.get_admin_client() - #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) - #existing_topics = set(topic_metadata.topics.keys()) existing_topics = set(kafka_admin_client.list_topics()) LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics))) @@ -122,8 +118,6 @@ class KafkaTopic(Enum): desired_topics = {topic.value for topic in KafkaTopic} missing_topics = set() for _ in range(KAFKA_TOPIC_CREATE_WAIT_ITERATIONS): - #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) - #existing_topics = set(topic_metadata.topics.keys()) existing_topics = set(kafka_admin_client.list_topics()) LOGGER.debug('existing_topics={:s}'.format(str(existing_topics))) missing_topics = desired_topics.difference(existing_topics) diff --git a/src/telemetry/backend/service/collectors/int_collector/INTCollector.py b/src/telemetry/backend/service/collectors/int_collector/INTCollector.py index 7f945efe9..0f35882f0 100644 --- a/src/telemetry/backend/service/collectors/int_collector/INTCollector.py +++ b/src/telemetry/backend/service/collectors/int_collector/INTCollector.py @@ -159,8 +159,8 @@ class INTCollector(_Collector): return False def on_idle_timeout(self): - LOGGER.info(f"=== INT Collector IDLE() - No INT packets arrived during the last {self.max_idle_time}") - LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds.") + LOGGER.info(f"=== INT Collector IDLE() - No INT packets arrived during the last {self.max_idle_time} seconds") + LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds") # Report a zero value for the P4 switch KPIs values = [0] -- GitLab From 36d0ebc8c380c14f2d098ff404c1940116c2605c Mon Sep 17 00:00:00 2001 From: "Georgios P. Katsikas" Date: Tue, 24 Feb 2026 13:53:58 +0000 Subject: [PATCH 2/3] fix: analytics test --- src/analytics/backend/tests/test_backend.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index a44351c03..0ce18d8b4 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -22,7 +22,6 @@ from unittest.mock import MagicMock, patch from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ get_windows_size, get_batch_size, get_agg_df, get_duration -from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.Streamer import DaskStreamer from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService @@ -261,7 +260,7 @@ def test_aggregation_handler(): # Test aggregation_handler aggregated_df = aggregation_handler( - "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds + "test_key", batch, input_kpi_list, output_kpi_list, thresholds ) assert isinstance(aggregated_df, list) assert all(isinstance(item, dict) for item in aggregated_df) @@ -276,7 +275,7 @@ def test_threshold_handler(): result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) assert isinstance(result, pd.DataFrame) - assert result.shape == (1, 7) + assert result.shape == (1, 5) ########################### -- GitLab From d7608701d026da7477144246fb85f0d17da2af3e Mon Sep 17 00:00:00 2001 From: "Georgios P. Katsikas" Date: Wed, 25 Feb 2026 07:02:19 +0000 Subject: [PATCH 3/3] fix: analytics tests --- src/analytics/backend/service/Streamer.py | 2 -- src/analytics/backend/tests/test_backend.py | 6 ++++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 1eba74976..91d89504f 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -143,8 +143,6 @@ class DaskStreamer(threading.Thread): future.add_done_callback( lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value) ) - # result = aggregation_handler(self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) - # self.produce_result(result, KafkaTopic.ALARMS.value) except Exception as e: logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 0ce18d8b4..9c3fd42cc 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import json import pytest import logging @@ -192,7 +191,8 @@ def test_produce_result(dask_streamer): """Test if produce_result sends records to Kafka.""" result = [{"kpi_id": "kpi1", "value": 100}] with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ - patch.object(dask_streamer.producer, 'produce') as mock_produce: + patch.object(dask_streamer.producer, 'produce') as mock_produce: + dask_streamer.output_kpis = ['kpi1'] dask_streamer.produce_result(result, "test_topic") mock_produce.assert_called_once_with( "test_topic", @@ -220,6 +220,8 @@ def test_run_with_valid_consumer(dask_streamer): with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: + dask_streamer.input_kpis = ['kpi1', 'kpi2'] + # Simulate valid messages without errors mock_message_1 = MagicMock() mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' -- GitLab