From 8c1d7155d62ae36c2297c46ed1a3862e1028521c Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 13 Jan 2025 13:04:35 +0000 Subject: [PATCH 1/7] Changes in Telemetry backend Analytics - Added AnalyzerHelper class - Added AnalyzerHandler class - Added updated streamer class - Update test scripts for analytics backend - Updated run script format --- .../run_tests_locally-analytics-backend.sh | 3 + .../backend/service/AnalyzerHandlers.py | 127 ++++++++ .../backend/service/AnalyzerHelper.py | 60 ++++ .../backend/service/DaskStreaming.py | 268 --------------- src/analytics/backend/service/Streamer.py | 159 +++++++++ src/analytics/backend/tests/messages.py | 5 +- .../backend/tests/messages_analyzer.py | 64 ++++ src/analytics/backend/tests/test_backend.py | 306 ++++++++++-------- 8 files changed, 589 insertions(+), 403 deletions(-) create mode 100644 src/analytics/backend/service/AnalyzerHandlers.py create mode 100644 src/analytics/backend/service/AnalyzerHelper.py delete mode 100644 src/analytics/backend/service/DaskStreaming.py create mode 100644 src/analytics/backend/service/Streamer.py create mode 100644 src/analytics/backend/tests/messages_analyzer.py diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 1c3386c62..78fab0f76 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,8 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc + export KFK_SERVER_ADDRESS='127.0.0.1:9092' + CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" + python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ analytics/backend/tests/test_backend.py diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py new file mode 100644 index 000000000..34ada434b --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -0,0 +1,127 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from enum import Enum +import pandas as pd + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + +class AnalyzerHandlers(Enum): + AGGREGATION_HANDLER = "AggregationHandler" + UNSUPPORTED_HANDLER = "UnsupportedHandler" + + @classmethod + def is_valid_handler(cls, handler_name): + return handler_name in cls._value2member_map_ + +# This method is top-level and should not be part of the class due to serialization issues. +def threshold_handler(key, aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + + Args: + key (str): Key for the aggregated DataFrame. + aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '' and values as (fail_th, raise_th). + + Returns: + pd.DataFrame: DataFrame with additional threshold columns. + """ + for metric_name, threshold_values in thresholds.items(): + # Ensure the metric column exists in the DataFrame + if metric_name not in aggregated_df.columns: + logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.") + continue + + # Ensure the threshold values are valid (check for tuple specifically) + if isinstance(threshold_values, tuple) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + + # Add threshold columns with updated naming + aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th + aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th + else: + logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.") + return aggregated_df + +def aggregation_handler( + batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds + ): + """ + Process a batch of data and calculate aggregated values for each input KPI + and maps them to the output KPIs. """ + + logger.info(f"({batch_type_name}) Processing batch for key: {key}") + if not batch: + logger.info("Empty batch received. Skipping processing.") + return [] + else: + logger.info(f"Processing {len(batch)} records for key: {key}") + + # Convert data into a DataFrame + df = pd.DataFrame(batch) + + # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) + df = df[df['kpi_id'].isin(input_kpi_list)].copy() + + # Define all possible aggregation methods + aggregation_methods = { + "min" : ('kpi_value', 'min'), + "max" : ('kpi_value', 'max'), + "avg" : ('kpi_value', 'mean'), + "first" : ('kpi_value', lambda x: x.iloc[0]), + "last" : ('kpi_value', lambda x: x.iloc[-1]), + "variance": ('kpi_value', 'var'), + "count" : ('kpi_value', 'count'), + "range" : ('kpi_value', lambda x: x.max() - x.min()), + "sum" : ('kpi_value', 'sum'), + } + + # Process each KPI-specific task parameter + for kpi_index, kpi_id in enumerate(input_kpi_list): + + # logger.info(f"1.Processing KPI: {kpi_id}") + kpi_task_parameters = thresholds["task_parameter"][kpi_index] + + # Get valid task parameters for this KPI + valid_task_parameters = [ + method for method in kpi_task_parameters.keys() + if method in aggregation_methods + ] + + # Select the aggregation methods based on valid task parameters + selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} + + # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") + + kpi_df = df[df['kpi_id'] == kpi_id] + + # Check if kpi_df is not empty before applying the aggregation methods + if not kpi_df.empty: + agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index() + + # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}") + + agg_df['kpi_id'] = output_kpi_list[kpi_index] + + result = threshold_handler(key, agg_df, kpi_task_parameters) + + return result.to_dict(orient='records') + else: + logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.") + continue diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py new file mode 100644 index 000000000..26d6e5fb9 --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -0,0 +1,60 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + + +class AnalyzerHelper: + def __init__(self): + pass + + @staticmethod + def initialize_dask_client(n_workers=1, threads_per_worker=1): + """Initialize a local Dask cluster and client.""" + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + client = Client(cluster) + logger.info(f"Dask Client Initialized: {client}") + return client, cluster + + @staticmethod + def initialize_kafka_consumer(): + """Initialize the Kafka consumer.""" + consumer_conf = { + 'bootstrap.servers': KafkaConfig.get_kafka_address(), + 'group.id': 'analytics-backend', + 'auto.offset.reset': 'latest' + } + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + return consumer + + @staticmethod + def initialize_kafka_producer(): + """Initialize the Kafka producer.""" + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + + @staticmethod + def delivery_report(err, msg): + if err is not None: + logger.error(f"Message delivery failed: {err}") + else: + logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py deleted file mode 100644 index 79dee7ef9..000000000 --- a/src/analytics/backend/service/DaskStreaming.py +++ /dev/null @@ -1,268 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import time -import json -from confluent_kafka import Consumer, Producer, KafkaException, KafkaError -import pandas as pd -from dask.distributed import Client, LocalCluster -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic - -logging.basicConfig(level=logging.INFO) -LOGGER = logging.getLogger(__name__) - -def SettingKafkaConsumerParams(): - return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-backend', - 'auto.offset.reset' : 'latest'} - -def GetAggregationMappings(thresholds): - agg_dict = {} - for threshold_key in thresholds.keys(): - parts = threshold_key.split('_', 1) - if len(parts) != 2: - LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '_' format. Skipping.") - continue - aggregation, metric_name = parts - # Ensure that the aggregation function is valid in pandas - if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: - LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") - continue - agg_dict[threshold_key] = ('kpi_value', aggregation) - return agg_dict - - -def ApplyThresholds(aggregated_df, thresholds): - """ - Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary - on the aggregated DataFrame. - Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. - thresholds (dict): Thresholds dictionary with keys in the format '_'. - Returns: pd.DataFrame: DataFrame with additional threshold columns. - """ - for threshold_key, threshold_values in thresholds.items(): - if threshold_key not in aggregated_df.columns: - - LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") - continue - if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: - fail_th, raise_th = threshold_values - aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th - aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th - aggregated_df["value"] = aggregated_df[threshold_key] - else: - LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") - return aggregated_df - -def initialize_dask_client(): - """ - Initialize a local Dask cluster and client. - """ - cluster = LocalCluster(n_workers=2, threads_per_worker=2) - client = Client(cluster) - LOGGER.info(f"Dask Client Initialized: {client}") - return client, cluster - -def initialize_kafka_producer(): - return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) - -def delivery_report(err, msg): - if err is not None: - LOGGER.error(f"Message delivery failed: {err}") - else: - LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - -def process_batch(batch, agg_mappings, thresholds, key): - """ - Process a batch of data and apply thresholds. - Args: batch (list of dict): List of messages from Kafka. - agg_mappings (dict): Mapping from threshold key to aggregation function. - thresholds (dict): Thresholds dictionary. - Returns: list of dict: Processed records ready to be sent to Kafka. - """ - if not batch: - LOGGER.info("Empty batch received. Skipping processing.") - return [] - - - df = pd.DataFrame(batch) - LOGGER.info(f"df {df} ") - df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') - df.dropna(subset=['time_stamp'], inplace=True) - LOGGER.info(f"df {df} ") - required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} - if not required_columns.issubset(df.columns): - LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") - return [] - if df.empty: - LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") - return [] - - # Perform aggregations using named aggregation - try: - agg_dict = {key: value for key, value in agg_mappings.items()} - - df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index() - - #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min') - - #given that threshold has 1 value - second_value_tuple = next(iter(agg_dict.values()))[1] - #in case we have multiple thresholds! - #second_values_tuples = [value[1] for value in agg_dict.values()] - if second_value_tuple=="min": - df_agg = df_agg_.min(numeric_only=True).to_frame().T - elif second_value_tuple == "max": - df_agg = df_agg_.max(numeric_only=True).to_frame().T - elif second_value_tuple == "std": - df_agg = df_agg_.sted(numeric_only=True).to_frame().T - else: - df_agg = df_agg_.mean(numeric_only=True).to_frame().T - - # Assign the first value of window_start from the original aggregated data - df_agg['window_start'] = df_agg_['window_start'].iloc[0] - - # Reorder columns to place 'window_start' first if needed - cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start'] - df_agg = df_agg[cols] - - except Exception as e: - LOGGER.error(f"Aggregation error: {e}") - return [] - - # Apply thresholds - - - df_thresholded = ApplyThresholds(df_agg, thresholds) - df_thresholded['kpi_id'] = key - df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') - # Convert aggregated DataFrame to list of dicts - result = df_thresholded.to_dict(orient='records') - LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") - return result - -def produce_result(result, producer, destination_topic): - for record in result: - try: - producer.produce( - destination_topic, - key=str(record.get('kpi_id', '')), - value=json.dumps(record), - callback=delivery_report - ) - except KafkaException as e: - LOGGER.error(f"Failed to produce message: {e}") - producer.flush() - LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") - -def DaskStreamer(key, kpi_list, thresholds, stop_event, - window_size="30s", time_stamp_col="time_stamp"): - client, cluster = initialize_dask_client() - consumer_conf = SettingKafkaConsumerParams() - consumer = Consumer(consumer_conf) - consumer.subscribe([KafkaTopic.VALUE.value]) - producer = initialize_kafka_producer() - - # Parse window_size to seconds - try: - window_size_td = pd.to_timedelta(window_size) - window_size_seconds = window_size_td.total_seconds() - except Exception as e: - LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") - window_size_seconds = 30 - LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") - - # Extract aggregation mappings from thresholds - agg_mappings = GetAggregationMappings(thresholds) - if not agg_mappings: - LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") - consumer.close() - producer.flush() - client.close() - cluster.close() - return - try: - batch = [] - last_batch_time = time.time() - LOGGER.info("Starting to consume messages...") - - while not stop_event.is_set(): - msg = consumer.poll(1.0) - - if msg is None: - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - continue - - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - else: - LOGGER.error(f"Kafka error: {msg.error()}") - continue - - try: - message_value = json.loads(msg.value().decode('utf-8')) - except json.JSONDecodeError as e: - LOGGER.error(f"JSON decode error: {e}") - continue - - try: - message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') - LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.") - - if pd.isna(message_timestamp): - LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") - continue - window_start = message_timestamp.floor(window_size) - LOGGER.warning(f"window_start: {window_start}. Skipping message.") - message_value['window_start'] = window_start - except Exception as e: - LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") - continue - - if message_value['kpi_id'] not in kpi_list: - LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") - continue - - batch.append(message_value) - - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds, key) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - - except Exception as e: - LOGGER.exception(f"Error in Dask streaming process: {e}") - finally: - # Process any remaining messages in the batch - if batch: - LOGGER.info("Processing remaining messages in the batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - consumer.close() - producer.flush() - LOGGER.info("Kafka consumer and producer closed.") - client.close() - cluster.close() - LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py new file mode 100644 index 000000000..124ef5651 --- /dev/null +++ b/src/analytics/backend/service/Streamer.py @@ -0,0 +1,159 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import KafkaException, KafkaError +# import pandas as pd +from common.tools.kafka.Variables import KafkaTopic +from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler +from .AnalyzerHelper import AnalyzerHelper + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + + +class DaskStreamer: + def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, + window_size=None, n_workers=5, threads_per_worker=2): + self.key = key + self.input_kpis = input_kpis + self.output_kpis = output_kpis + self.thresholds = thresholds + self.window_size = window_size + self.batch_size = batch_size + self.n_workers = n_workers + self.threads_per_worker = threads_per_worker + self.running = True + self.batch = [] + + # Initialize Kafka and Dask components + self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker) + self.consumer = AnalyzerHelper.initialize_kafka_consumer() + self.producer = AnalyzerHelper.initialize_kafka_producer() + logger.info("Dask Streamer initialized.") + + def run(self): + """Main method to start the DaskStreamer.""" + try: + logger.info("Starting Dask Streamer") + last_batch_time = time.time() + while True: + if not self.consumer: + logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.") + break + if not self.running: + logger.warning("Dask Streamer is not running. Exiting loop.") + break + message = self.consumer.poll(timeout=2.0) + if message is None: + # logger.info("No new messages received.") + continue + if message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + elif message.error(): + raise KafkaException(message.error()) + else: + try: + value = json.loads(message.value()) + except json.JSONDecodeError: + logger.error(f"Failed to decode message: {message.value()}") + continue + self.batch.append(value) + # logger.info(f"Received message: {value}") + + # Window size has a priority over batch size + if self.window_size is None: + if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size + logger.info(f"Processing based on batch size {self.batch_size}.") + self.task_handler_selector() + self.batch = [] + else: + # Process based on window size + current_time = time.time() + if (current_time - last_batch_time) >= self.window_size and self.batch: + logger.info(f"Processing based on window size {self.window_size}.") + self.task_handler_selector() + self.batch = [] + last_batch_time = current_time + + except Exception as e: + logger.exception(f"Error in Dask streaming process: {e}") + finally: + logger.info(">>> Exiting Dask Streamer...") + self.cleanup() + logger.info(">>> Dask Streamer Cleanup Completed.") + + def task_handler_selector(self): + """Select the task handler based on the task type.""" + if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]): + if self.client.status == 'running': + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + else: + logger.warning("Dask client is not running. Skipping processing.") + else: + logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.") + + def produce_result(self, result, destination_topic): + """Produce results to the Kafka topic.""" + for record in result: + try: + self.producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=AnalyzerHelper.delivery_report + ) + except KafkaException as e: + logger.error(f"Failed to produce message: {e}") + self.producer.flush() + logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + + def cleanup(self): + """Clean up Kafka and Dask resources.""" + logger.info("Shutting down resources...") + self.running = False + if self.consumer: + try: + self.consumer.close() + logger.info("Kafka consumer closed.") + except Exception as e: + logger.error(f"Error closing Kafka consumer: {e}") + + if self.producer: + try: + self.producer.flush() + logger.info("Kafka producer flushed and closed.") + except Exception as e: + logger.error(f"Error closing Kafka producer: {e}") + + if self.client and hasattr(self.client, 'status') and self.client.status == 'running': + try: + self.client.close() + logger.info("Dask client closed.") + except Exception as e: + logger.error(f"Error closing Dask client: {e}") + + if self.cluster and hasattr(self.cluster, 'close'): + try: + self.cluster.close(timeout=5) + logger.info("Dask cluster closed.") + except Exception as e: + logger.error(f"May be timeout. Error closing Dask cluster: {e}") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 55d966dfb..2ff1e9353 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -53,8 +53,8 @@ def create_analyzer(): _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - _kpi_id = KpiId() # input IDs to analyze + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7" _create_analyzer.input_kpi_ids.append(_kpi_id) @@ -63,11 +63,14 @@ def create_analyzer(): _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter _threshold_dict = { # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py new file mode 100644 index 000000000..040fbb468 --- /dev/null +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -0,0 +1,64 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers + +def get_input_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] + +def get_output_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4'] + +def get_thresholds(): + return { + "task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value, + "task_parameter": [ + {"last": (40, 80), "variance": (300, 500)}, + {"count": (2, 4), "max": (70, 100)}, + {"min": (10, 20), "avg": (50, 70)}, + ], + } + +def get_duration(): + return 30 + +def get_windows_size(): + return None + +def get_batch_size(): + return 10 + +def get_interval(): + return 5 + +def get_batch(): + return [ + {"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72}, + {"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22}, + {"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24}, + {"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67}, + {"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6}, + {"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9}, + {"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44}, + {"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76}, + {"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71}, + {"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44}, + ] + +def get_agg_df(): + data = [ + {"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41}, + ] + return pd.DataFrame(data) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 4aa9df5fa..cc0167399 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,148 +12,186 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time, json -from typing import Dict +import pytest import logging -from threading import Event, Thread +import pandas as pd +from unittest.mock import MagicMock, patch from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService -from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer, create_analyzer_dask -from threading import Thread, Event -from ..service.DaskStreaming import DaskStreamer +from analytics.backend.service.Streamer import DaskStreamer +from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ + get_windows_size, get_batch_size, get_agg_df +from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + logger.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) ########################### # Tests Implementation of Telemetry Backend ########################### -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + logger.info(f" >>> Starting test: {request.node.name} >>> ") + yield + logger.info(f" <<< Finished test: {request.node.name} <<< ") + +@pytest.fixture +def dask_streamer(): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer: + + mock_dask_client.return_value = (MagicMock(), MagicMock()) + mock_kafka_consumer.return_value = MagicMock() + mock_kafka_producer.return_value = MagicMock() + + return DaskStreamer( + key="test_key", + input_kpis=get_input_kpi_list(), + output_kpis=get_output_kpi_list(), + thresholds=get_thresholds(), + batch_size=get_batch_size(), + window_size=get_windows_size(), + n_workers=3, + threads_per_worker=1 + ) + +def test_initialization(dask_streamer): + """Test if the DaskStreamer initializes correctly.""" + assert dask_streamer.key == "test_key" + assert dask_streamer.batch_size == get_batch_size() + assert dask_streamer.window_size is None + assert dask_streamer.n_workers == 3 + assert dask_streamer.consumer is not None + assert dask_streamer.producer is not None + assert dask_streamer.client is not None + assert dask_streamer.cluster is not None + + +def test_run_stops_on_no_consumer(dask_streamer): + """Test if the run method exits when the consumer is not initialized.""" + dask_streamer.consumer = None + with patch('time.sleep', return_value=None): + dask_streamer.run() + assert not dask_streamer.running + +def test_task_handler_selector_valid_handler(dask_streamer): + """Test task handler selection with a valid handler.""" + with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \ + patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \ + patch.object(dask_streamer.client, 'status', 'running'): + + dask_streamer.task_handler_selector() + mock_submit.assert_called_once() + +def test_task_handler_selector_invalid_handler(dask_streamer): + """Test task handler selection with an invalid handler.""" + with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False): + dask_streamer.task_handler_selector() + assert dask_streamer.batch == [] + +def test_produce_result(dask_streamer): + """Test if produce_result sends records to Kafka.""" + result = [{"kpi_id": "kpi1", "value": 100}] + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ + patch.object(dask_streamer.producer, 'produce') as mock_produce: + dask_streamer.produce_result(result, "test_topic") + mock_produce.assert_called_once_with( + "test_topic", + key="kpi1", + value='{"kpi_id": "kpi1", "value": 100}', + callback=mock_delivery_report + ) + +def test_cleanup(dask_streamer): + """Test the cleanup method.""" + with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ + patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \ + patch.object(dask_streamer.client, 'close') as mock_client_close, \ + patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close: + + # Mock the conditions required for the close calls + dask_streamer.client.status = 'running' + dask_streamer.cluster.close = MagicMock() + + dask_streamer.cleanup() + + mock_consumer_close.assert_called_once() + mock_producer_flush.assert_called_once() + mock_client_close.assert_called_once() + dask_streamer.cluster.close.assert_called_once() + +def test_run_with_valid_consumer(dask_streamer): + """Test the run method with a valid Kafka consumer.""" + with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ + patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: + + # Simulate valid messages without errors + mock_message_1 = MagicMock() + mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' + mock_message_1.error.return_value = None # No error + + mock_message_2 = MagicMock() + mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}' + mock_message_2.error.return_value = None # No error + + # Mock `poll` to return valid messages + mock_poll.side_effect = [mock_message_1, mock_message_2] + + # Run the `run` method in a limited loop + with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays + dask_streamer.running = True # Ensure the streamer runs + dask_streamer.batch_size = 2 # Set a small batch size for the test + + # Limit the loop by breaking it after one full processing cycle + def stop_running_after_task_handler(*args, **kwargs): + logger.info("Stopping the streamer after processing the first batch.") + dask_streamer.running = False + + mock_task_handler_selector.side_effect = stop_running_after_task_handler + + # Execute the method + dask_streamer.run() + + # Assertions + assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing + mock_task_handler_selector.assert_called_once() # Task handler should be called once + mock_poll.assert_any_call(timeout=2.0) # Poll should have been called + + +# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py +def test_aggregation_handler(): + + # Create a sample batch + batch = get_batch() + input_kpi_list = get_input_kpi_list() + output_kpi_list = get_output_kpi_list() + thresholds = get_thresholds() + + # Test aggregation_handler + aggregated_df = aggregation_handler( + "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds + ) + assert isinstance(aggregated_df, list) + assert all(isinstance(item, dict) for item in aggregated_df) +# Test threshold_handler +def test_threshold_handler(): + # Create a sample aggregated DataFrame + agg_df = get_agg_df() + thresholds = get_thresholds() -# --- To test Dask Streamer functionality --- -# def test_StartDaskStreamer(): # Directly from the Streamer class -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# stop_event = Event() -# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] -# oper_list = ['avg', 'min', 'max',] -# thresholds = { -# 'avg_value': (10.0, 90.0), -# 'min_value': (5.0, 95.0), -# 'max_value': (15.0, 85.0), -# 'latency' : (2.0, 10.0) -# } - -# # Start the DaskStreamer in a separate thread -# streamer_thread = Thread( -# target=DaskStreamer, -# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), -# kwargs={ -# "window_size": "60s", -# "win_slide_duration": "30s", -# "time_stamp_col": "time_stamp" -# } -# ) -# streamer_thread.start() -# try: -# while True: -# time.sleep(10) -# except KeyboardInterrupt: -# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") -# stop_event.set() -# streamer_thread.join() -# LOGGER.info("Streamer stopped gracefully.") - -# --- To test Start Streamer functionality --- -# def test_StartDaskStreamer(): -# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") -# analyzer_obj = create_analyzer_dask() -# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), -# # "oper_list" : analyzer_obj.parameters["oper_list"], -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) -# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) -# time.sleep(100) -# LOGGER.info('Initiating StopRequestListener...') -# # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# --- To test Start Streamer functionality --- -# def test_StartSparkStreamer(): -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# analyzer_obj = create_analyzer() -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) - -# --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# AnalyticsBackendServiceObj.stop_event = Event() -# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) -# listener_thread.start() - -# time.sleep(100) - - # AnalyticsBackendServiceObj.stop_event.set() - # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') - # listener_thread.join(timeout=10) - # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." - # LOGGER.info('Completed test_RunRequestListener') - -# To test START and STOP communication together -# def test_StopRequestListener(): -# LOGGER.info('test_RunRequestListener') -# LOGGER.info('Initiating StartRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) -# # LOGGER.debug(str(response_thread)) -# time.sleep(10) -# LOGGER.info('Initiating StopRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# To independently tests the SparkListener functionality -# def test_SparkListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.RunSparkStreamer( -# get_kpi_id_list(), get_operation_list(), get_threshold_dict() -# ) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) + # Test threshold_handler + result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) + assert isinstance(result, pd.DataFrame) + assert result.shape == (1, 7) -- GitLab From 26865f9320e8e0935c56fd7e964595e289ca1719 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 13 Jan 2025 13:40:41 +0000 Subject: [PATCH 2/7] Commented previous DaskStreamer import. - To be updated later. --- src/analytics/backend/service/AnalyticsBackendService.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f3a58feaa..38a305aec 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -23,7 +23,7 @@ from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from threading import Thread, Event -from .DaskStreaming import DaskStreamer +# from .DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -89,7 +89,7 @@ class AnalyticsBackendService(GenericGrpcService): try: stop_event = Event() thread = Thread( - target=DaskStreamer, + target=None, # DaskStreamer, # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event), kwargs={ -- GitLab From 1e90b149dbbc5e2bbb044e35954870c77925132c Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 13 Jan 2025 17:27:16 +0000 Subject: [PATCH 3/7] Initial Telemetry backend and new analytics integration test. --- .../run_tests_locally-analytics-backend.sh | 2 +- .../run_tests_locally-analytics-frontend.sh | 2 +- .../service/AnalyticsBackendService.py | 95 ++++++++++--------- .../backend/service/AnalyzerHandlers.py | 8 +- src/analytics/backend/service/Streamer.py | 4 +- src/analytics/backend/tests/test_backend.py | 35 +++++-- 6 files changed, 85 insertions(+), 61 deletions(-) diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 78fab0f76..700155a42 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -24,5 +24,5 @@ export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ +python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \ analytics/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index 6e945406f..0cb4dc98d 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ +python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \ analytics/frontend/tests/test_frontend.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 38a305aec..508feecea 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import json import logging import threading + +import pytz from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer @@ -23,7 +24,11 @@ from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from threading import Thread, Event -# from .DaskStreaming import DaskStreamer +from analytics.backend.service.Streamer import DaskStreamer +from common.proto.analytics_frontend_pb2 import Analyzer +from apscheduler.schedulers.background import BackgroundScheduler +from datetime import datetime, timedelta + LOGGER = logging.getLogger(__name__) @@ -35,13 +40,18 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) + self.schedular = BackgroundScheduler(daemon=True) + self.schedular.start() self.running_threads = {} # To keep track of all running analyzers self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) def install_servicers(self): - threading.Thread(target=self.RequestListener, args=()).start() + threading.Thread( + target=self.RequestListener, + args=() + ).start() def RequestListener(self): """ @@ -69,56 +79,53 @@ class AnalyticsBackendService(GenericGrpcService): # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.StopDaskListener(analyzer_uuid) + self.StopStreamer(analyzer_uuid) else: - self.StartDaskListener(analyzer_uuid, analyzer) + self.StartStreamer(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - # print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def StartDaskListener(self, analyzer_uuid, analyzer): - kpi_list = analyzer[ 'input_kpis' ] - thresholds = analyzer[ 'thresholds' ] - window_size = analyzer[ 'window_size' ] - window_slider = analyzer[ 'window_slider'] - LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( - kpi_list, thresholds, window_size, window_slider)) - # print ("Received parameters: {:} - {:} - {:} - {:}".format( - # kpi_list, thresholds, window_size, window_slider)) + def StartStreamer(self, analyzer_uuid : str, analyzer : json): + """ + Start the DaskStreamer with the given parameters. + """ try: - stop_event = Event() - thread = Thread( - target=None, # DaskStreamer, - # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), - args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event), - kwargs={ - "window_size" : window_size, - } + streamer = DaskStreamer( + analyzer_uuid, + analyzer['input_kpis' ], + analyzer['output_kpis'], + analyzer['thresholds' ], + analyzer['batch_size' ], + analyzer['window_size'], ) - thread.start() - self.running_threads[analyzer_uuid] = (thread, stop_event) - # print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + self.schedular.add_job( + streamer.run, + 'date', + run_date=datetime.now(pytz.utc), + id=analyzer_uuid, + replace_existing=True + ) + LOGGER.info("Dask Streamer started.") return True except Exception as e: - # print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e)) return False - def StopDaskListener(self, analyzer_uuid): - if analyzer_uuid in self.running_threads: - try: - thread, stop_event = self.running_threads[analyzer_uuid] - stop_event.set() - thread.join() - del self.running_threads[analyzer_uuid] - # print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) - return True - except Exception as e: - LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) + def StopStreamer(self, analyzer_uuid : str): + """ + Stop the DaskStreamer with the given analyzer_uuid. + """ + try: + active_jobs = self.schedular.get_jobs() + logger.debug("Active Jobs: {:}".format(active_jobs)) + if analyzer_uuid not in [job.id for job in active_jobs]: + LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False - else: - # print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + self.schedular.remove_job(analyzer_uuid) + LOGGER.info("Dask Streamer stopped.") + return True + except Exception as e: + LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) + return False + diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 34ada434b..f407de2a0 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -18,7 +18,7 @@ import pandas as pd logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') class AnalyzerHandlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" @@ -49,14 +49,14 @@ def threshold_handler(key, aggregated_df, thresholds): continue # Ensure the threshold values are valid (check for tuple specifically) - if isinstance(threshold_values, tuple) and len(threshold_values) == 2: + if isinstance(threshold_values, list) and len(threshold_values) == 2: fail_th, raise_th = threshold_values # Add threshold columns with updated naming aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th else: - logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.") + logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.") return aggregated_df def aggregation_handler( @@ -71,7 +71,7 @@ def aggregation_handler( logger.info("Empty batch received. Skipping processing.") return [] else: - logger.info(f"Processing {len(batch)} records for key: {key}") + logger.info(f" >>>>> Processing {len(batch)} records for key: {key}") # Convert data into a DataFrame df = pd.DataFrame(batch) diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 124ef5651..cabed8588 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -24,7 +24,7 @@ from .AnalyzerHelper import AnalyzerHelper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class DaskStreamer: @@ -59,7 +59,7 @@ class DaskStreamer: if not self.running: logger.warning("Dask Streamer is not running. Exiting loop.") break - message = self.consumer.poll(timeout=2.0) + message = self.consumer.poll(timeout=2.0) # Poll for new messages after 2 sceonds if message is None: # logger.info("No new messages received.") continue diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index cc0167399..09be90e4c 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import pytest import logging import pandas as pd @@ -21,19 +22,11 @@ from analytics.backend.service.Streamer import DaskStreamer from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ get_windows_size, get_batch_size, get_agg_df from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler +from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - logger.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -########################### -# Tests Implementation of Telemetry Backend -########################### @pytest.fixture(autouse=True) def log_all_methods(request): @@ -45,6 +38,30 @@ def log_all_methods(request): yield logger.info(f" <<< Finished test: {request.node.name} <<< ") + +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + logger.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + +########################### +# integration test of Streamer with backend service +########################### + +def test_backend_integration_with_analyzer(): + backendServiceObject = AnalyticsBackendService() + backendServiceObject.install_servicers() + logger.info(" waiting for 2 minutes for the backend service before termination ... ") + time.sleep(120) + backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") + logger.info(" Backend service terminated successfully ... ") + + +########################### +# funtionality pytest for analyzer sub methods +########################### + @pytest.fixture def dask_streamer(): with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ -- GitLab From cf640e651d7a775222dc9c2367d8c262c9f59d1a Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 14 Jan 2025 16:53:05 +0000 Subject: [PATCH 4/7] Updated Analaytics backend streamer class. - Enhance AnalyticsBackendService - DaskStreamer with improved logging and threading management - Update duration handling in tests. --- .../service/AnalyticsBackendService.py | 45 ++++++++++++------- .../backend/service/AnalyzerHandlers.py | 6 ++- src/analytics/backend/service/Streamer.py | 21 +++++---- .../backend/tests/messages_analyzer.py | 2 +- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 508feecea..eab275324 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import json import logging import threading @@ -26,11 +27,11 @@ from common.Settings import get_service_port_grpc from threading import Thread, Event from analytics.backend.service.Streamer import DaskStreamer from common.proto.analytics_frontend_pb2 import Analyzer -from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyticsBackendService(GenericGrpcService): """ @@ -40,9 +41,7 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) - self.schedular = BackgroundScheduler(daemon=True) - self.schedular.start() - self.running_threads = {} # To keep track of all running analyzers + self.active_streamers = {} self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) @@ -75,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService): try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: @@ -90,6 +89,9 @@ class AnalyticsBackendService(GenericGrpcService): """ Start the DaskStreamer with the given parameters. """ + if analyzer_uuid in self.active_streamers: + LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid)) + return False try: streamer = DaskStreamer( analyzer_uuid, @@ -99,13 +101,20 @@ class AnalyticsBackendService(GenericGrpcService): analyzer['batch_size' ], analyzer['window_size'], ) - self.schedular.add_job( - streamer.run, - 'date', - run_date=datetime.now(pytz.utc), - id=analyzer_uuid, - replace_existing=True - ) + streamer.start() + logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + + # Stop the streamer after the given duration + if analyzer['duration'] is not None: + def stop_after_duration(): + time.sleep(analyzer['duration']) + logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}") + streamer.stop() + + duration_thread = threading.Thread(target=stop_after_duration, daemon=True) + duration_thread.start() + + self.active_streamers[analyzer_uuid] = streamer LOGGER.info("Dask Streamer started.") return True except Exception as e: @@ -117,13 +126,15 @@ class AnalyticsBackendService(GenericGrpcService): Stop the DaskStreamer with the given analyzer_uuid. """ try: - active_jobs = self.schedular.get_jobs() - logger.debug("Active Jobs: {:}".format(active_jobs)) - if analyzer_uuid not in [job.id for job in active_jobs]: + if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False - self.schedular.remove_job(analyzer_uuid) - LOGGER.info("Dask Streamer stopped.") + LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}") + streamer = self.active_streamers[analyzer_uuid] + streamer.stop() + streamer.join() + del self.active_streamers[analyzer_uuid] + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.") return True except Exception as e: LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index f407de2a0..0e23bab69 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -119,9 +119,13 @@ def aggregation_handler( agg_df['kpi_id'] = output_kpi_list[kpi_index] + # if agg_df.empty: + # logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.") + # continue + # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") result = threshold_handler(key, agg_df, kpi_task_parameters) return result.to_dict(orient='records') else: - logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.") + logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") continue diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index cabed8588..35d35b36e 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -16,20 +16,19 @@ import logging import time import json from confluent_kafka import KafkaException, KafkaError -# import pandas as pd from common.tools.kafka.Variables import KafkaTopic from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler from .AnalyzerHelper import AnalyzerHelper +import threading - -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') -class DaskStreamer: +class DaskStreamer(threading.Thread): def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, - window_size=None, n_workers=5, threads_per_worker=2): + window_size=None, n_workers=1, threads_per_worker=1): + super().__init__() self.key = key self.input_kpis = input_kpis self.output_kpis = output_kpis @@ -96,8 +95,6 @@ class DaskStreamer: logger.exception(f"Error in Dask streaming process: {e}") finally: logger.info(">>> Exiting Dask Streamer...") - self.cleanup() - logger.info(">>> Dask Streamer Cleanup Completed.") def task_handler_selector(self): """Select the task handler based on the task type.""" @@ -126,7 +123,7 @@ class DaskStreamer: self.producer.flush() logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") - def cleanup(self): + def stop(self): """Clean up Kafka and Dask resources.""" logger.info("Shutting down resources...") self.running = False @@ -144,16 +141,18 @@ class DaskStreamer: except Exception as e: logger.error(f"Error closing Kafka producer: {e}") - if self.client and hasattr(self.client, 'status') and self.client.status == 'running': + if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running': try: self.client.close() logger.info("Dask client closed.") except Exception as e: logger.error(f"Error closing Dask client: {e}") - if self.cluster and hasattr(self.cluster, 'close'): + if self.cluster is not None and hasattr(self.cluster, 'close'): try: self.cluster.close(timeout=5) logger.info("Dask cluster closed.") except Exception as e: - logger.error(f"May be timeout. Error closing Dask cluster: {e}") + logger.error(f"Timeout error while closing Dask cluster: {e}") + + diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 040fbb468..6a303d474 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -32,7 +32,7 @@ def get_thresholds(): } def get_duration(): - return 30 + return 60 def get_windows_size(): return None -- GitLab From 04e1cf6ccc5d4edefac84c7cd05895a2d3246fe8 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 17 Jan 2025 14:55:59 +0000 Subject: [PATCH 5/7] Updated Analytics backend Service and streamer class - Add new output KPI types - refactor AnalyzerHandlers - Updated streamer interaction - added more test cases. --- proto/kpi_sample_types.proto | 10 + .../service/AnalyticsBackendService.py | 86 ++++--- .../backend/service/AnalyzerHandlers.py | 15 +- .../backend/service/AnalyzerHelper.py | 26 +- src/analytics/backend/service/Streamer.py | 83 +++--- .../backend/tests/messages_analyzer.py | 12 +- src/analytics/backend/tests/test_backend.py | 241 ++++++++++++------ 7 files changed, 306 insertions(+), 167 deletions(-) diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 0a9800d9e..d4efc084e 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -39,4 +39,14 @@ enum KpiSampleType { KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; + +// output KPIs + KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; + KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; + KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; + KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; + KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; + KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; + + KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; } diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index eab275324..11ce1b377 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -17,17 +17,14 @@ import json import logging import threading -import pytz from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Consumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from threading import Thread, Event from analytics.backend.service.Streamer import DaskStreamer -from common.proto.analytics_frontend_pb2 import Analyzer -from datetime import datetime, timedelta +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper LOGGER = logging.getLogger(__name__) @@ -35,16 +32,25 @@ logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyticsBackendService(GenericGrpcService): """ - Class listens for ... + AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService. + It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly. + It also initializes the Kafka producer and Dask cluster for the streamer. """ - def __init__(self, cls_name : str = __name__) -> None: + def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1 + ) -> None: LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) self.active_streamers = {} - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) + self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer + self.cluster = AnalyzerHelper.initialize_dask_cluster( + n_workers, threads_per_worker) # Local cluster + self.request_consumer = Consumer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest', + }) + def install_servicers(self): threading.Thread( @@ -58,7 +64,7 @@ class AnalyticsBackendService(GenericGrpcService): """ LOGGER.info("Request Listener is initiated ...") # print ("Request Listener is initiated ...") - consumer = self.kafka_consumer + consumer = self.request_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) @@ -69,23 +75,27 @@ class AnalyticsBackendService(GenericGrpcService): continue else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - # print ("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.StopStreamer(analyzer_uuid) + if self.StopStreamer(analyzer_uuid): + LOGGER.info("Dask Streamer stopped.") + else: + LOGGER.error("Failed to stop Dask Streamer.") else: - self.StartStreamer(analyzer_uuid, analyzer) + if self.StartStreamer(analyzer_uuid, analyzer): + LOGGER.info("Dask Streamer started.") + else: + LOGGER.error("Failed to start Dask Streamer.") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def StartStreamer(self, analyzer_uuid : str, analyzer : json): + def StartStreamer(self, analyzer_uuid : str, analyzer : dict): """ Start the DaskStreamer with the given parameters. """ @@ -94,28 +104,30 @@ class AnalyticsBackendService(GenericGrpcService): return False try: streamer = DaskStreamer( - analyzer_uuid, - analyzer['input_kpis' ], - analyzer['output_kpis'], - analyzer['thresholds' ], - analyzer['batch_size' ], - analyzer['window_size'], + key = analyzer_uuid, + input_kpis = analyzer['input_kpis' ], + output_kpis = analyzer['output_kpis'], + thresholds = analyzer['thresholds' ], + batch_size = analyzer['batch_size' ], + window_size = analyzer['window_size'], + cluster_instance = self.cluster, + producer_instance = self.central_producer, ) streamer.start() - logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") # Stop the streamer after the given duration - if analyzer['duration'] is not None: + if analyzer['duration'] > 0: def stop_after_duration(): time.sleep(analyzer['duration']) - logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}") - streamer.stop() + LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}") + if not self.StopStreamer(analyzer_uuid): + LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") duration_thread = threading.Thread(target=stop_after_duration, daemon=True) duration_thread.start() self.active_streamers[analyzer_uuid] = streamer - LOGGER.info("Dask Streamer started.") return True except Exception as e: LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e)) @@ -129,14 +141,30 @@ class AnalyticsBackendService(GenericGrpcService): if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False - LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}") + LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}") streamer = self.active_streamers[analyzer_uuid] streamer.stop() streamer.join() del self.active_streamers[analyzer_uuid] - LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.") + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.") return True except Exception as e: LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) return False + def close(self): # TODO: Is this function needed? + """ + Close the producer and cluster cleanly. + """ + if self.central_producer: + try: + self.central_producer.flush() + LOGGER.info("Kafka producer flushed and closed.") + except Exception as e: + LOGGER.error(f"Error closing Kafka producer: {e}") + if self.cluster: + try: + self.cluster.close() + LOGGER.info("Dask cluster closed.") + except Exception as e: + LOGGER.error(f"Error closing Dask cluster: {e}") diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 0e23bab69..c8a9c838a 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -16,11 +16,11 @@ import logging from enum import Enum import pandas as pd - logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') -class AnalyzerHandlers(Enum): + +class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" UNSUPPORTED_HANDLER = "UnsupportedHandler" @@ -56,7 +56,7 @@ def threshold_handler(key, aggregated_df, thresholds): aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th else: - logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.") + logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.") return aggregated_df def aggregation_handler( @@ -79,6 +79,10 @@ def aggregation_handler( # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) df = df[df['kpi_id'].isin(input_kpi_list)].copy() + if df.empty: + logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.") + return [] + # Define all possible aggregation methods aggregation_methods = { "min" : ('kpi_value', 'min'), @@ -108,7 +112,6 @@ def aggregation_handler( selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") - kpi_df = df[df['kpi_id'] == kpi_id] # Check if kpi_df is not empty before applying the aggregation methods @@ -119,9 +122,6 @@ def aggregation_handler( agg_df['kpi_id'] = output_kpi_list[kpi_index] - # if agg_df.empty: - # logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.") - # continue # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") result = threshold_handler(key, agg_df, kpi_task_parameters) @@ -129,3 +129,4 @@ def aggregation_handler( else: logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") continue + return [] diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py index 26d6e5fb9..15dde6e62 100644 --- a/src/analytics/backend/service/AnalyzerHelper.py +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -15,12 +15,11 @@ from dask.distributed import Client, LocalCluster from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +from confluent_kafka import Consumer, Producer import logging -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyzerHelper: @@ -28,15 +27,24 @@ class AnalyzerHelper: pass @staticmethod - def initialize_dask_client(n_workers=1, threads_per_worker=1): - """Initialize a local Dask cluster and client.""" - cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) - client = Client(cluster) + def initialize_dask_client(cluster_instance): + """Initialize a local Dask client.""" + if cluster_instance is None: + logger.error("Dask Cluster is not initialized. Exiting.") + return None + client = Client(cluster_instance) logger.info(f"Dask Client Initialized: {client}") - return client, cluster + return client + + @staticmethod + def initialize_dask_cluster(n_workers=1, threads_per_worker=2): + """Initialize a local Dask cluster""" + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + logger.info(f"Dask Cluster Initialized: {cluster}") + return cluster @staticmethod - def initialize_kafka_consumer(): + def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters """Initialize the Kafka consumer.""" consumer_conf = { 'bootstrap.servers': KafkaConfig.get_kafka_address(), diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 35d35b36e..54ca70f5f 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -12,22 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import time import json -from confluent_kafka import KafkaException, KafkaError -from common.tools.kafka.Variables import KafkaTopic -from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler -from .AnalyzerHelper import AnalyzerHelper import threading +import logging + +from confluent_kafka import KafkaException, KafkaError +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class DaskStreamer(threading.Thread): - def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, - window_size=None, n_workers=1, threads_per_worker=1): + def __init__(self, key, input_kpis, output_kpis, thresholds, + batch_size = 5, + window_size = None, + cluster_instance = None, + producer_instance = AnalyzerHelper.initialize_kafka_producer() + ): super().__init__() self.key = key self.input_kpis = input_kpis @@ -35,15 +41,14 @@ class DaskStreamer(threading.Thread): self.thresholds = thresholds self.window_size = window_size self.batch_size = batch_size - self.n_workers = n_workers - self.threads_per_worker = threads_per_worker self.running = True self.batch = [] # Initialize Kafka and Dask components - self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker) - self.consumer = AnalyzerHelper.initialize_kafka_consumer() - self.producer = AnalyzerHelper.initialize_kafka_producer() + self.client = AnalyzerHelper.initialize_dask_client(cluster_instance) + self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer + self.producer = producer_instance + logger.info("Dask Streamer initialized.") def run(self): @@ -56,9 +61,12 @@ class DaskStreamer(threading.Thread): logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.") break if not self.running: - logger.warning("Dask Streamer is not running. Exiting loop.") + logger.warning("Dask Streamer instance has been terminated. Exiting loop.") + break + if not self.client: + logger.warning("Dask client is not running. Exiting loop.") break - message = self.consumer.poll(timeout=2.0) # Poll for new messages after 2 sceonds + message = self.consumer.poll(timeout=2.0) if message is None: # logger.info("No new messages received.") continue @@ -74,11 +82,10 @@ class DaskStreamer(threading.Thread): logger.error(f"Failed to decode message: {message.value()}") continue self.batch.append(value) - # logger.info(f"Received message: {value}") - # Window size has a priority over batch size + # Window size has a precedence over batch size if self.window_size is None: - if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size + if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size logger.info(f"Processing based on batch size {self.batch_size}.") self.task_handler_selector() self.batch = [] @@ -94,15 +101,20 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.exception(f"Error in Dask streaming process: {e}") finally: + self.stop() logger.info(">>> Exiting Dask Streamer...") def task_handler_selector(self): """Select the task handler based on the task type.""" - if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]): - if self.client.status == 'running': - future = self.client.submit(aggregation_handler, "batch size", self.key, - self.batch, self.input_kpis, self.output_kpis, self.thresholds) - future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + logger.info(f"Batch to be processed: {self.batch}") + if Handlers.is_valid_handler(self.thresholds["task_type"]): + if self.client is not None and self.client.status == 'running': + try: + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + except Exception as e: + logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: @@ -110,6 +122,9 @@ class DaskStreamer(threading.Thread): def produce_result(self, result, destination_topic): """Produce results to the Kafka topic.""" + if not result: + logger.warning("Nothing to produce. Skipping.") + return for record in result: try: self.producer.produce( @@ -124,9 +139,13 @@ class DaskStreamer(threading.Thread): logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") def stop(self): - """Clean up Kafka and Dask resources.""" - logger.info("Shutting down resources...") + """Clean up Kafka and Dask thread resources.""" + if not self.running: + logger.info("Dask Streamer is already stopped.") + return self.running = False + logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...") + time.sleep(5) # Waiting time for running tasks to complete if self.consumer: try: self.consumer.close() @@ -134,13 +153,6 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.error(f"Error closing Kafka consumer: {e}") - if self.producer: - try: - self.producer.flush() - logger.info("Kafka producer flushed and closed.") - except Exception as e: - logger.error(f"Error closing Kafka producer: {e}") - if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running': try: self.client.close() @@ -148,11 +160,4 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.error(f"Error closing Dask client: {e}") - if self.cluster is not None and hasattr(self.cluster, 'close'): - try: - self.cluster.close(timeout=5) - logger.info("Dask cluster closed.") - except Exception as e: - logger.error(f"Timeout error while closing Dask cluster: {e}") - - +# TODO: May be Single streamer for all analyzers ... ? diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 6a303d474..4a119d948 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -13,7 +13,7 @@ # limitations under the License. import pandas as pd -from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers +from analytics.backend.service.AnalyzerHandlers import Handlers def get_input_kpi_list(): return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] @@ -23,16 +23,16 @@ def get_output_kpi_list(): def get_thresholds(): return { - "task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value, + "task_type": Handlers.AGGREGATION_HANDLER.value, "task_parameter": [ - {"last": (40, 80), "variance": (300, 500)}, - {"count": (2, 4), "max": (70, 100)}, - {"min": (10, 20), "avg": (50, 70)}, + {"last": [40, 80], "variance": [300, 500]}, + {"count": [2, 4], "max": [70, 100]}, + {"min": [10, 20], "avg": [50, 70]}, ], } def get_duration(): - return 60 + return 40 def get_windows_size(): return None diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 09be90e4c..3be34ee9f 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -13,107 +13,177 @@ # limitations under the License. import time +import json import pytest import logging import pandas as pd -from unittest.mock import MagicMock, patch -from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.Streamer import DaskStreamer + +from unittest.mock import MagicMock, patch from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ - get_windows_size, get_batch_size, get_agg_df -from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler + get_windows_size, get_batch_size, get_agg_df, get_duration + +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.Streamer import DaskStreamer +from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +# ---- +# Test fixtures and helper functions +# ---- + @pytest.fixture(autouse=True) def log_all_methods(request): ''' This fixture logs messages before and after each test function runs, indicating the start and end of the test. The autouse=True parameter ensures that this logging happens automatically for all tests in the module. ''' - logger.info(f" >>> Starting test: {request.node.name} >>> ") + logger.info(f" >>>>> Starting test: {request.node.name} ") yield - logger.info(f" <<< Finished test: {request.node.name} <<< ") + logger.info(f" <<<<< Finished test: {request.node.name} ") +@pytest.fixture +def mock_kafka_producer(): + mock_producer = MagicMock() + mock_producer.produce = MagicMock() + mock_producer.flush = MagicMock() + return mock_producer -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - logger.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +@pytest.fixture +def mock_dask_cluster(): + mock_cluster = MagicMock() + mock_cluster.close = MagicMock() + return mock_cluster + +@pytest.fixture +def mock_dask_client(): + mock_client = MagicMock() + mock_client.status = 'running' + mock_client.submit = MagicMock() + return mock_client + +@pytest.fixture() +def mock_kafka_consumer(): + mock_consumer = MagicMock() + mock_consumer.subscribe = MagicMock() + return mock_consumer + +@pytest.fixture() +def mock_streamer_start(): + mock_streamer = MagicMock() + mock_streamer.start = MagicMock() + return mock_streamer ########################### -# integration test of Streamer with backend service +# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods ########################### -def test_backend_integration_with_analyzer(): - backendServiceObject = AnalyticsBackendService() - backendServiceObject.install_servicers() - logger.info(" waiting for 2 minutes for the backend service before termination ... ") - time.sleep(120) - backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") - logger.info(" Backend service terminated successfully ... ") +@pytest.fixture +def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \ + patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start): + + service = AnalyticsBackendService() + yield service + service.close() + +@pytest.fixture +def analyzer_data(): + return { + 'algo_name' : 'test_algorithm', + 'oper_mode' : 'test_mode', + 'input_kpis' : ['kpi1', 'kpi2'], + 'output_kpis': ['kpi3'], + 'thresholds' : {'kpi1': 0.5}, + 'batch_size' : 10, + 'window_size': 5, + 'duration' : 20, + } + +def test_start_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + # Start streamer + result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert result is True + assert analyzer_uuid in analytics_service.active_streamers + +def test_stop_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + + # Start streamer for stopping it later + analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert analyzer_uuid in analytics_service.active_streamers + # Stop streamer + result = analytics_service.StopStreamer(analyzer_uuid) + assert result is True + assert analyzer_uuid not in analytics_service.active_streamers + + # Verify that the streamer was stopped + assert analyzer_uuid not in analytics_service.active_streamers + +def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster): + analytics_service.close() + + mock_kafka_producer.flush.assert_called_once() + mock_dask_cluster.close.assert_called_once() ########################### -# funtionality pytest for analyzer sub methods +# funtionality pytest with specific fixtures for streamer class sub methods ########################### @pytest.fixture -def dask_streamer(): - with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ - patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \ - patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer: - - mock_dask_client.return_value = (MagicMock(), MagicMock()) - mock_kafka_consumer.return_value = MagicMock() - mock_kafka_producer.return_value = MagicMock() +def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer): return DaskStreamer( - key="test_key", - input_kpis=get_input_kpi_list(), - output_kpis=get_output_kpi_list(), - thresholds=get_thresholds(), - batch_size=get_batch_size(), - window_size=get_windows_size(), - n_workers=3, - threads_per_worker=1 + key = "test_key", + input_kpis = get_input_kpi_list(), + output_kpis = get_output_kpi_list(), + thresholds = get_thresholds(), + batch_size = get_batch_size(), + window_size = get_windows_size(), + cluster_instance = mock_dask_cluster(), + producer_instance = mock_kafka_producer(), ) -def test_initialization(dask_streamer): +def test_dask_streamer_initialization(dask_streamer): """Test if the DaskStreamer initializes correctly.""" - assert dask_streamer.key == "test_key" - assert dask_streamer.batch_size == get_batch_size() + assert dask_streamer.key == "test_key" + assert dask_streamer.batch_size == get_batch_size() assert dask_streamer.window_size is None - assert dask_streamer.n_workers == 3 - assert dask_streamer.consumer is not None - assert dask_streamer.producer is not None - assert dask_streamer.client is not None - assert dask_streamer.cluster is not None - + assert dask_streamer.consumer is not None + assert dask_streamer.producer is not None + assert dask_streamer.client is not None def test_run_stops_on_no_consumer(dask_streamer): """Test if the run method exits when the consumer is not initialized.""" dask_streamer.consumer = None with patch('time.sleep', return_value=None): dask_streamer.run() + assert not dask_streamer.running -def test_task_handler_selector_valid_handler(dask_streamer): +def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client): """Test task handler selection with a valid handler.""" - with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \ - patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \ - patch.object(dask_streamer.client, 'status', 'running'): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client): dask_streamer.task_handler_selector() - mock_submit.assert_called_once() + assert dask_streamer.client.status == 'running' def test_task_handler_selector_invalid_handler(dask_streamer): """Test task handler selection with an invalid handler.""" - with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False): + with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False): dask_streamer.task_handler_selector() assert dask_streamer.batch == [] @@ -126,31 +196,27 @@ def test_produce_result(dask_streamer): mock_produce.assert_called_once_with( "test_topic", key="kpi1", - value='{"kpi_id": "kpi1", "value": 100}', + value=json.dumps({"kpi_id": "kpi1", "value": 100}), callback=mock_delivery_report ) -def test_cleanup(dask_streamer): +def test_stop(dask_streamer): """Test the cleanup method.""" - with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ - patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \ - patch.object(dask_streamer.client, 'close') as mock_client_close, \ - patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close: + with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ + patch.object(dask_streamer.client, 'close') as mock_client_close, \ + patch('time.sleep', return_value=0): # Mock the conditions required for the close calls dask_streamer.client.status = 'running' - dask_streamer.cluster.close = MagicMock() - dask_streamer.cleanup() + dask_streamer.stop() mock_consumer_close.assert_called_once() - mock_producer_flush.assert_called_once() mock_client_close.assert_called_once() - dask_streamer.cluster.close.assert_called_once() def test_run_with_valid_consumer(dask_streamer): """Test the run method with a valid Kafka consumer.""" - with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ + with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: # Simulate valid messages without errors @@ -167,33 +233,29 @@ def test_run_with_valid_consumer(dask_streamer): # Run the `run` method in a limited loop with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays - dask_streamer.running = True # Ensure the streamer runs - dask_streamer.batch_size = 2 # Set a small batch size for the test + dask_streamer.running = True + dask_streamer.batch_size = 2 # Limit the loop by breaking it after one full processing cycle - def stop_running_after_task_handler(*args, **kwargs): + def stop_running_after_task_handler(): logger.info("Stopping the streamer after processing the first batch.") dask_streamer.running = False mock_task_handler_selector.side_effect = stop_running_after_task_handler - - # Execute the method dask_streamer.run() - # Assertions assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing mock_task_handler_selector.assert_called_once() # Task handler should be called once - mock_poll.assert_any_call(timeout=2.0) # Poll should have been called - + mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once -# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py +# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py def test_aggregation_handler(): # Create a sample batch - batch = get_batch() - input_kpi_list = get_input_kpi_list() + batch = get_batch() + input_kpi_list = get_input_kpi_list() output_kpi_list = get_output_kpi_list() - thresholds = get_thresholds() + thresholds = get_thresholds() # Test aggregation_handler aggregated_df = aggregation_handler( @@ -202,13 +264,38 @@ def test_aggregation_handler(): assert isinstance(aggregated_df, list) assert all(isinstance(item, dict) for item in aggregated_df) -# Test threshold_handler +# # Test threshold_handler def test_threshold_handler(): # Create a sample aggregated DataFrame - agg_df = get_agg_df() + agg_df = get_agg_df() thresholds = get_thresholds() # Test threshold_handler result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) + assert isinstance(result, pd.DataFrame) assert result.shape == (1, 7) + + +########################### +# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline) +########################### +# This is a local machine test to check the integration of the backend service with the Streamer + +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +# def test_validate_kafka_topics(): +# logger.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + +# def test_backend_integration_with_analyzer(): +# backendServiceObject = AnalyticsBackendService() +# backendServiceObject.install_servicers() +# logger.info(" waiting for 2 minutes for the backend service before termination ... ") +# time.sleep(150) +# logger.info(" Initiating stop collector ... ") +# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") +# backendServiceObject.close() +# assert isinstance(status, bool) +# assert status == True +# logger.info(" Backend service terminated successfully ... ") -- GitLab From cfa1829bbc7a6a70675896af567145c50f2f1817 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 17 Jan 2025 15:01:09 +0000 Subject: [PATCH 6/7] Updated Analyatics Backend tests --- src/analytics/backend/tests/test_backend.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 3be34ee9f..91d53000d 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -99,12 +99,12 @@ def analyzer_data(): return { 'algo_name' : 'test_algorithm', 'oper_mode' : 'test_mode', - 'input_kpis' : ['kpi1', 'kpi2'], - 'output_kpis': ['kpi3'], - 'thresholds' : {'kpi1': 0.5}, - 'batch_size' : 10, - 'window_size': 5, - 'duration' : 20, + 'input_kpis' : get_input_kpi_list(), + 'output_kpis': get_output_kpi_list(), + 'thresholds' : get_thresholds(), + 'batch_size' : get_batch_size(), + 'window_size': get_windows_size(), + 'duration' : get_duration(), } def test_start_streamer(analytics_service, analyzer_data): -- GitLab From 5d18e0086f7d260613ea9c03fc17c5f17bbf6fc5 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 17 Jan 2025 15:38:01 +0000 Subject: [PATCH 7/7] Pre-merge code cleanup --- src/analytics/backend/service/AnalyticsBackendService.py | 1 - src/analytics/backend/service/AnalyzerHandlers.py | 1 - src/analytics/backend/service/AnalyzerHelper.py | 1 - src/analytics/backend/service/Streamer.py | 1 - 4 files changed, 4 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 11ce1b377..92332df6f 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -28,7 +28,6 @@ from analytics.backend.service.AnalyzerHelper import AnalyzerHelper LOGGER = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyticsBackendService(GenericGrpcService): """ diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index c8a9c838a..a05b1a0b7 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -17,7 +17,6 @@ from enum import Enum import pandas as pd logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') class Handlers(Enum): diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py index 15dde6e62..15a45aee6 100644 --- a/src/analytics/backend/service/AnalyzerHelper.py +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -19,7 +19,6 @@ from confluent_kafka import Consumer, Producer import logging logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyzerHelper: diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 54ca70f5f..c72359db2 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -24,7 +24,6 @@ from analytics.backend.service.AnalyzerHelper import AnalyzerHelper logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class DaskStreamer(threading.Thread): -- GitLab