diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 1c3386c62084bb42d6ffa2e1349b6f4286820a52..78fab0f7609d048e3a7660de00e100a86007e90c 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,8 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc + export KFK_SERVER_ADDRESS='127.0.0.1:9092' + CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" + python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ analytics/backend/tests/test_backend.py diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py new file mode 100644 index 0000000000000000000000000000000000000000..34ada434b98d7b7521607be23a373d21985a562b --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -0,0 +1,127 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from enum import Enum +import pandas as pd + + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + +class AnalyzerHandlers(Enum): + AGGREGATION_HANDLER = "AggregationHandler" + UNSUPPORTED_HANDLER = "UnsupportedHandler" + + @classmethod + def is_valid_handler(cls, handler_name): + return handler_name in cls._value2member_map_ + +# This method is top-level and should not be part of the class due to serialization issues. +def threshold_handler(key, aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + + Args: + key (str): Key for the aggregated DataFrame. + aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th). + + Returns: + pd.DataFrame: DataFrame with additional threshold columns. + """ + for metric_name, threshold_values in thresholds.items(): + # Ensure the metric column exists in the DataFrame + if metric_name not in aggregated_df.columns: + logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.") + continue + + # Ensure the threshold values are valid (check for tuple specifically) + if isinstance(threshold_values, tuple) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + + # Add threshold columns with updated naming + aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th + aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th + else: + logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.") + return aggregated_df + +def aggregation_handler( + batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds + ): + """ + Process a batch of data and calculate aggregated values for each input KPI + and maps them to the output KPIs. """ + + logger.info(f"({batch_type_name}) Processing batch for key: {key}") + if not batch: + logger.info("Empty batch received. Skipping processing.") + return [] + else: + logger.info(f"Processing {len(batch)} records for key: {key}") + + # Convert data into a DataFrame + df = pd.DataFrame(batch) + + # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) + df = df[df['kpi_id'].isin(input_kpi_list)].copy() + + # Define all possible aggregation methods + aggregation_methods = { + "min" : ('kpi_value', 'min'), + "max" : ('kpi_value', 'max'), + "avg" : ('kpi_value', 'mean'), + "first" : ('kpi_value', lambda x: x.iloc[0]), + "last" : ('kpi_value', lambda x: x.iloc[-1]), + "variance": ('kpi_value', 'var'), + "count" : ('kpi_value', 'count'), + "range" : ('kpi_value', lambda x: x.max() - x.min()), + "sum" : ('kpi_value', 'sum'), + } + + # Process each KPI-specific task parameter + for kpi_index, kpi_id in enumerate(input_kpi_list): + + # logger.info(f"1.Processing KPI: {kpi_id}") + kpi_task_parameters = thresholds["task_parameter"][kpi_index] + + # Get valid task parameters for this KPI + valid_task_parameters = [ + method for method in kpi_task_parameters.keys() + if method in aggregation_methods + ] + + # Select the aggregation methods based on valid task parameters + selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} + + # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") + + kpi_df = df[df['kpi_id'] == kpi_id] + + # Check if kpi_df is not empty before applying the aggregation methods + if not kpi_df.empty: + agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index() + + # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}") + + agg_df['kpi_id'] = output_kpi_list[kpi_index] + + result = threshold_handler(key, agg_df, kpi_task_parameters) + + return result.to_dict(orient='records') + else: + logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.") + continue diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py new file mode 100644 index 0000000000000000000000000000000000000000..26d6e5fb9cca45b93a33038e8658cab51bab8ad5 --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -0,0 +1,60 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + + +class AnalyzerHelper: + def __init__(self): + pass + + @staticmethod + def initialize_dask_client(n_workers=1, threads_per_worker=1): + """Initialize a local Dask cluster and client.""" + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + client = Client(cluster) + logger.info(f"Dask Client Initialized: {client}") + return client, cluster + + @staticmethod + def initialize_kafka_consumer(): + """Initialize the Kafka consumer.""" + consumer_conf = { + 'bootstrap.servers': KafkaConfig.get_kafka_address(), + 'group.id': 'analytics-backend', + 'auto.offset.reset': 'latest' + } + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + return consumer + + @staticmethod + def initialize_kafka_producer(): + """Initialize the Kafka producer.""" + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + + @staticmethod + def delivery_report(err, msg): + if err is not None: + logger.error(f"Message delivery failed: {err}") + else: + logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py deleted file mode 100644 index 79dee7ef972a8b5546e3b38289c4fdb5b4bcc0d1..0000000000000000000000000000000000000000 --- a/src/analytics/backend/service/DaskStreaming.py +++ /dev/null @@ -1,268 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import time -import json -from confluent_kafka import Consumer, Producer, KafkaException, KafkaError -import pandas as pd -from dask.distributed import Client, LocalCluster -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic - -logging.basicConfig(level=logging.INFO) -LOGGER = logging.getLogger(__name__) - -def SettingKafkaConsumerParams(): - return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-backend', - 'auto.offset.reset' : 'latest'} - -def GetAggregationMappings(thresholds): - agg_dict = {} - for threshold_key in thresholds.keys(): - parts = threshold_key.split('_', 1) - if len(parts) != 2: - LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.") - continue - aggregation, metric_name = parts - # Ensure that the aggregation function is valid in pandas - if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: - LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") - continue - agg_dict[threshold_key] = ('kpi_value', aggregation) - return agg_dict - - -def ApplyThresholds(aggregated_df, thresholds): - """ - Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary - on the aggregated DataFrame. - Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. - thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'. - Returns: pd.DataFrame: DataFrame with additional threshold columns. - """ - for threshold_key, threshold_values in thresholds.items(): - if threshold_key not in aggregated_df.columns: - - LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") - continue - if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: - fail_th, raise_th = threshold_values - aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th - aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th - aggregated_df["value"] = aggregated_df[threshold_key] - else: - LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") - return aggregated_df - -def initialize_dask_client(): - """ - Initialize a local Dask cluster and client. - """ - cluster = LocalCluster(n_workers=2, threads_per_worker=2) - client = Client(cluster) - LOGGER.info(f"Dask Client Initialized: {client}") - return client, cluster - -def initialize_kafka_producer(): - return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) - -def delivery_report(err, msg): - if err is not None: - LOGGER.error(f"Message delivery failed: {err}") - else: - LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - -def process_batch(batch, agg_mappings, thresholds, key): - """ - Process a batch of data and apply thresholds. - Args: batch (list of dict): List of messages from Kafka. - agg_mappings (dict): Mapping from threshold key to aggregation function. - thresholds (dict): Thresholds dictionary. - Returns: list of dict: Processed records ready to be sent to Kafka. - """ - if not batch: - LOGGER.info("Empty batch received. Skipping processing.") - return [] - - - df = pd.DataFrame(batch) - LOGGER.info(f"df {df} ") - df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') - df.dropna(subset=['time_stamp'], inplace=True) - LOGGER.info(f"df {df} ") - required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} - if not required_columns.issubset(df.columns): - LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") - return [] - if df.empty: - LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") - return [] - - # Perform aggregations using named aggregation - try: - agg_dict = {key: value for key, value in agg_mappings.items()} - - df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index() - - #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min') - - #given that threshold has 1 value - second_value_tuple = next(iter(agg_dict.values()))[1] - #in case we have multiple thresholds! - #second_values_tuples = [value[1] for value in agg_dict.values()] - if second_value_tuple=="min": - df_agg = df_agg_.min(numeric_only=True).to_frame().T - elif second_value_tuple == "max": - df_agg = df_agg_.max(numeric_only=True).to_frame().T - elif second_value_tuple == "std": - df_agg = df_agg_.sted(numeric_only=True).to_frame().T - else: - df_agg = df_agg_.mean(numeric_only=True).to_frame().T - - # Assign the first value of window_start from the original aggregated data - df_agg['window_start'] = df_agg_['window_start'].iloc[0] - - # Reorder columns to place 'window_start' first if needed - cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start'] - df_agg = df_agg[cols] - - except Exception as e: - LOGGER.error(f"Aggregation error: {e}") - return [] - - # Apply thresholds - - - df_thresholded = ApplyThresholds(df_agg, thresholds) - df_thresholded['kpi_id'] = key - df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') - # Convert aggregated DataFrame to list of dicts - result = df_thresholded.to_dict(orient='records') - LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") - return result - -def produce_result(result, producer, destination_topic): - for record in result: - try: - producer.produce( - destination_topic, - key=str(record.get('kpi_id', '')), - value=json.dumps(record), - callback=delivery_report - ) - except KafkaException as e: - LOGGER.error(f"Failed to produce message: {e}") - producer.flush() - LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") - -def DaskStreamer(key, kpi_list, thresholds, stop_event, - window_size="30s", time_stamp_col="time_stamp"): - client, cluster = initialize_dask_client() - consumer_conf = SettingKafkaConsumerParams() - consumer = Consumer(consumer_conf) - consumer.subscribe([KafkaTopic.VALUE.value]) - producer = initialize_kafka_producer() - - # Parse window_size to seconds - try: - window_size_td = pd.to_timedelta(window_size) - window_size_seconds = window_size_td.total_seconds() - except Exception as e: - LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") - window_size_seconds = 30 - LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") - - # Extract aggregation mappings from thresholds - agg_mappings = GetAggregationMappings(thresholds) - if not agg_mappings: - LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") - consumer.close() - producer.flush() - client.close() - cluster.close() - return - try: - batch = [] - last_batch_time = time.time() - LOGGER.info("Starting to consume messages...") - - while not stop_event.is_set(): - msg = consumer.poll(1.0) - - if msg is None: - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - continue - - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - else: - LOGGER.error(f"Kafka error: {msg.error()}") - continue - - try: - message_value = json.loads(msg.value().decode('utf-8')) - except json.JSONDecodeError as e: - LOGGER.error(f"JSON decode error: {e}") - continue - - try: - message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') - LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.") - - if pd.isna(message_timestamp): - LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") - continue - window_start = message_timestamp.floor(window_size) - LOGGER.warning(f"window_start: {window_start}. Skipping message.") - message_value['window_start'] = window_start - except Exception as e: - LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") - continue - - if message_value['kpi_id'] not in kpi_list: - LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") - continue - - batch.append(message_value) - - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds, key) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - - except Exception as e: - LOGGER.exception(f"Error in Dask streaming process: {e}") - finally: - # Process any remaining messages in the batch - if batch: - LOGGER.info("Processing remaining messages in the batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - consumer.close() - producer.flush() - LOGGER.info("Kafka consumer and producer closed.") - client.close() - cluster.close() - LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py new file mode 100644 index 0000000000000000000000000000000000000000..124ef5651ebb44957acc1480535293c2df35da23 --- /dev/null +++ b/src/analytics/backend/service/Streamer.py @@ -0,0 +1,159 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import KafkaException, KafkaError +# import pandas as pd +from common.tools.kafka.Variables import KafkaTopic +from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler +from .AnalyzerHelper import AnalyzerHelper + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + + +class DaskStreamer: + def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, + window_size=None, n_workers=5, threads_per_worker=2): + self.key = key + self.input_kpis = input_kpis + self.output_kpis = output_kpis + self.thresholds = thresholds + self.window_size = window_size + self.batch_size = batch_size + self.n_workers = n_workers + self.threads_per_worker = threads_per_worker + self.running = True + self.batch = [] + + # Initialize Kafka and Dask components + self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker) + self.consumer = AnalyzerHelper.initialize_kafka_consumer() + self.producer = AnalyzerHelper.initialize_kafka_producer() + logger.info("Dask Streamer initialized.") + + def run(self): + """Main method to start the DaskStreamer.""" + try: + logger.info("Starting Dask Streamer") + last_batch_time = time.time() + while True: + if not self.consumer: + logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.") + break + if not self.running: + logger.warning("Dask Streamer is not running. Exiting loop.") + break + message = self.consumer.poll(timeout=2.0) + if message is None: + # logger.info("No new messages received.") + continue + if message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + elif message.error(): + raise KafkaException(message.error()) + else: + try: + value = json.loads(message.value()) + except json.JSONDecodeError: + logger.error(f"Failed to decode message: {message.value()}") + continue + self.batch.append(value) + # logger.info(f"Received message: {value}") + + # Window size has a priority over batch size + if self.window_size is None: + if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size + logger.info(f"Processing based on batch size {self.batch_size}.") + self.task_handler_selector() + self.batch = [] + else: + # Process based on window size + current_time = time.time() + if (current_time - last_batch_time) >= self.window_size and self.batch: + logger.info(f"Processing based on window size {self.window_size}.") + self.task_handler_selector() + self.batch = [] + last_batch_time = current_time + + except Exception as e: + logger.exception(f"Error in Dask streaming process: {e}") + finally: + logger.info(">>> Exiting Dask Streamer...") + self.cleanup() + logger.info(">>> Dask Streamer Cleanup Completed.") + + def task_handler_selector(self): + """Select the task handler based on the task type.""" + if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]): + if self.client.status == 'running': + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + else: + logger.warning("Dask client is not running. Skipping processing.") + else: + logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.") + + def produce_result(self, result, destination_topic): + """Produce results to the Kafka topic.""" + for record in result: + try: + self.producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=AnalyzerHelper.delivery_report + ) + except KafkaException as e: + logger.error(f"Failed to produce message: {e}") + self.producer.flush() + logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + + def cleanup(self): + """Clean up Kafka and Dask resources.""" + logger.info("Shutting down resources...") + self.running = False + if self.consumer: + try: + self.consumer.close() + logger.info("Kafka consumer closed.") + except Exception as e: + logger.error(f"Error closing Kafka consumer: {e}") + + if self.producer: + try: + self.producer.flush() + logger.info("Kafka producer flushed and closed.") + except Exception as e: + logger.error(f"Error closing Kafka producer: {e}") + + if self.client and hasattr(self.client, 'status') and self.client.status == 'running': + try: + self.client.close() + logger.info("Dask client closed.") + except Exception as e: + logger.error(f"Error closing Dask client: {e}") + + if self.cluster and hasattr(self.cluster, 'close'): + try: + self.cluster.close(timeout=5) + logger.info("Dask cluster closed.") + except Exception as e: + logger.error(f"May be timeout. Error closing Dask cluster: {e}") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 55d966dfbbae2318a5f774049b02f5b340070113..2ff1e93536e94863ba9eaaf76d35f7453ba95727 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -53,8 +53,8 @@ def create_analyzer(): _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - _kpi_id = KpiId() # input IDs to analyze + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7" _create_analyzer.input_kpi_ids.append(_kpi_id) @@ -63,11 +63,14 @@ def create_analyzer(): _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter _threshold_dict = { # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py new file mode 100644 index 0000000000000000000000000000000000000000..040fbb4686aa9e1fd36de5947653c51c3373bb17 --- /dev/null +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -0,0 +1,64 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers + +def get_input_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] + +def get_output_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4'] + +def get_thresholds(): + return { + "task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value, + "task_parameter": [ + {"last": (40, 80), "variance": (300, 500)}, + {"count": (2, 4), "max": (70, 100)}, + {"min": (10, 20), "avg": (50, 70)}, + ], + } + +def get_duration(): + return 30 + +def get_windows_size(): + return None + +def get_batch_size(): + return 10 + +def get_interval(): + return 5 + +def get_batch(): + return [ + {"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72}, + {"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22}, + {"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24}, + {"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67}, + {"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6}, + {"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9}, + {"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44}, + {"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76}, + {"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71}, + {"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44}, + ] + +def get_agg_df(): + data = [ + {"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41}, + ] + return pd.DataFrame(data) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 4aa9df5fae9849ee429361603a35b2fb8eaa4d23..cc0167399f3367144a1332656e9ad563f0258148 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,148 +12,186 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time, json -from typing import Dict +import pytest import logging -from threading import Event, Thread +import pandas as pd +from unittest.mock import MagicMock, patch from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService -from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer, create_analyzer_dask -from threading import Thread, Event -from ..service.DaskStreaming import DaskStreamer +from analytics.backend.service.Streamer import DaskStreamer +from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ + get_windows_size, get_batch_size, get_agg_df +from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +def test_validate_kafka_topics(): + logger.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) ########################### # Tests Implementation of Telemetry Backend ########################### -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + logger.info(f" >>> Starting test: {request.node.name} >>> ") + yield + logger.info(f" <<< Finished test: {request.node.name} <<< ") + +@pytest.fixture +def dask_streamer(): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer: + + mock_dask_client.return_value = (MagicMock(), MagicMock()) + mock_kafka_consumer.return_value = MagicMock() + mock_kafka_producer.return_value = MagicMock() + + return DaskStreamer( + key="test_key", + input_kpis=get_input_kpi_list(), + output_kpis=get_output_kpi_list(), + thresholds=get_thresholds(), + batch_size=get_batch_size(), + window_size=get_windows_size(), + n_workers=3, + threads_per_worker=1 + ) + +def test_initialization(dask_streamer): + """Test if the DaskStreamer initializes correctly.""" + assert dask_streamer.key == "test_key" + assert dask_streamer.batch_size == get_batch_size() + assert dask_streamer.window_size is None + assert dask_streamer.n_workers == 3 + assert dask_streamer.consumer is not None + assert dask_streamer.producer is not None + assert dask_streamer.client is not None + assert dask_streamer.cluster is not None + + +def test_run_stops_on_no_consumer(dask_streamer): + """Test if the run method exits when the consumer is not initialized.""" + dask_streamer.consumer = None + with patch('time.sleep', return_value=None): + dask_streamer.run() + assert not dask_streamer.running + +def test_task_handler_selector_valid_handler(dask_streamer): + """Test task handler selection with a valid handler.""" + with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \ + patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \ + patch.object(dask_streamer.client, 'status', 'running'): + + dask_streamer.task_handler_selector() + mock_submit.assert_called_once() + +def test_task_handler_selector_invalid_handler(dask_streamer): + """Test task handler selection with an invalid handler.""" + with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False): + dask_streamer.task_handler_selector() + assert dask_streamer.batch == [] + +def test_produce_result(dask_streamer): + """Test if produce_result sends records to Kafka.""" + result = [{"kpi_id": "kpi1", "value": 100}] + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ + patch.object(dask_streamer.producer, 'produce') as mock_produce: + dask_streamer.produce_result(result, "test_topic") + mock_produce.assert_called_once_with( + "test_topic", + key="kpi1", + value='{"kpi_id": "kpi1", "value": 100}', + callback=mock_delivery_report + ) + +def test_cleanup(dask_streamer): + """Test the cleanup method.""" + with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ + patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \ + patch.object(dask_streamer.client, 'close') as mock_client_close, \ + patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close: + + # Mock the conditions required for the close calls + dask_streamer.client.status = 'running' + dask_streamer.cluster.close = MagicMock() + + dask_streamer.cleanup() + + mock_consumer_close.assert_called_once() + mock_producer_flush.assert_called_once() + mock_client_close.assert_called_once() + dask_streamer.cluster.close.assert_called_once() + +def test_run_with_valid_consumer(dask_streamer): + """Test the run method with a valid Kafka consumer.""" + with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ + patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: + + # Simulate valid messages without errors + mock_message_1 = MagicMock() + mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' + mock_message_1.error.return_value = None # No error + + mock_message_2 = MagicMock() + mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}' + mock_message_2.error.return_value = None # No error + + # Mock `poll` to return valid messages + mock_poll.side_effect = [mock_message_1, mock_message_2] + + # Run the `run` method in a limited loop + with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays + dask_streamer.running = True # Ensure the streamer runs + dask_streamer.batch_size = 2 # Set a small batch size for the test + + # Limit the loop by breaking it after one full processing cycle + def stop_running_after_task_handler(*args, **kwargs): + logger.info("Stopping the streamer after processing the first batch.") + dask_streamer.running = False + + mock_task_handler_selector.side_effect = stop_running_after_task_handler + + # Execute the method + dask_streamer.run() + + # Assertions + assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing + mock_task_handler_selector.assert_called_once() # Task handler should be called once + mock_poll.assert_any_call(timeout=2.0) # Poll should have been called + + +# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py +def test_aggregation_handler(): + + # Create a sample batch + batch = get_batch() + input_kpi_list = get_input_kpi_list() + output_kpi_list = get_output_kpi_list() + thresholds = get_thresholds() + + # Test aggregation_handler + aggregated_df = aggregation_handler( + "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds + ) + assert isinstance(aggregated_df, list) + assert all(isinstance(item, dict) for item in aggregated_df) +# Test threshold_handler +def test_threshold_handler(): + # Create a sample aggregated DataFrame + agg_df = get_agg_df() + thresholds = get_thresholds() -# --- To test Dask Streamer functionality --- -# def test_StartDaskStreamer(): # Directly from the Streamer class -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# stop_event = Event() -# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] -# oper_list = ['avg', 'min', 'max',] -# thresholds = { -# 'avg_value': (10.0, 90.0), -# 'min_value': (5.0, 95.0), -# 'max_value': (15.0, 85.0), -# 'latency' : (2.0, 10.0) -# } - -# # Start the DaskStreamer in a separate thread -# streamer_thread = Thread( -# target=DaskStreamer, -# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), -# kwargs={ -# "window_size": "60s", -# "win_slide_duration": "30s", -# "time_stamp_col": "time_stamp" -# } -# ) -# streamer_thread.start() -# try: -# while True: -# time.sleep(10) -# except KeyboardInterrupt: -# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") -# stop_event.set() -# streamer_thread.join() -# LOGGER.info("Streamer stopped gracefully.") - -# --- To test Start Streamer functionality --- -# def test_StartDaskStreamer(): -# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") -# analyzer_obj = create_analyzer_dask() -# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), -# # "oper_list" : analyzer_obj.parameters["oper_list"], -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) -# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) -# time.sleep(100) -# LOGGER.info('Initiating StopRequestListener...') -# # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# --- To test Start Streamer functionality --- -# def test_StartSparkStreamer(): -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# analyzer_obj = create_analyzer() -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) - -# --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# AnalyticsBackendServiceObj.stop_event = Event() -# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) -# listener_thread.start() - -# time.sleep(100) - - # AnalyticsBackendServiceObj.stop_event.set() - # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') - # listener_thread.join(timeout=10) - # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." - # LOGGER.info('Completed test_RunRequestListener') - -# To test START and STOP communication together -# def test_StopRequestListener(): -# LOGGER.info('test_RunRequestListener') -# LOGGER.info('Initiating StartRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) -# # LOGGER.debug(str(response_thread)) -# time.sleep(10) -# LOGGER.info('Initiating StopRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# To independently tests the SparkListener functionality -# def test_SparkListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.RunSparkStreamer( -# get_kpi_id_list(), get_operation_list(), get_threshold_dict() -# ) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) + # Test threshold_handler + result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) + assert isinstance(result, pd.DataFrame) + assert result.shape == (1, 7)