diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 0a9800d9e5839205e1e45f84e4c8bdafbe93f32f..d4efc084e5f1ea2376e71ef6a15bc9b972c5ac1d 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -39,4 +39,14 @@ enum KpiSampleType { KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; + +// output KPIs + KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; + KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; + KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; + KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; + KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; + KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; + + KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; } diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 1c3386c62084bb42d6ffa2e1349b6f4286820a52..700155a42714bd05069c7c62db9ada09b4125355 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,8 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc + export KFK_SERVER_ADDRESS='127.0.0.1:9092' + CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + +python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \ analytics/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index 6e945406f0ff7b2670a35d5315d0ef428f701988..2c18296cf86ae4a00904fb684e2de5c56da9a2ea 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,8 +18,10 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc + export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require" -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + +python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \ analytics/frontend/tests/test_frontend.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 3ad4a2d0e05dbb11573eb146b4f0a4959894ace0..f648a62520f2f7b23f30edb19bf54735f5d13e12 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -18,15 +18,12 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc -# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py -# python3 kpi_manager/tests/test_unitary.py export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ - telemetry/backend/tests/test_TelemetryBackend.py +python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ + telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-emulated.sh b/scripts/run_tests_locally-telemetry-emulated.sh new file mode 100755 index 0000000000000000000000000000000000000000..879b878c7281a17dcc89a36ff146939151540ec4 --- /dev/null +++ b/scripts/run_tests_locally-telemetry-emulated.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc + +# export KFK_SERVER_ADDRESS='127.0.0.1:9092' +# CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" +RCFILE=$PROJECTDIR/coverage/.coveragerc + + +python3 -m pytest --log-level=debug --log-cli-level=info --verbose \ + telemetry/backend/tests/test_emulated.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f3a58feaab8667b266052803dddd1641b8a690f3..1abdd62c0f7162fe7a50f1c08e698f71c5fc93ad 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -16,109 +16,164 @@ import time import json import logging import threading + from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaError +from confluent_kafka import Consumer +from confluent_kafka import KafkaError, KafkaException from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from threading import Thread, Event -from .DaskStreaming import DaskStreamer +from analytics.backend.service.Streamer import DaskStreamer +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper + LOGGER = logging.getLogger(__name__) class AnalyticsBackendService(GenericGrpcService): """ - Class listens for ... + AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService. + It listens to the Kafka topic for the requests to start and stop the Streamer accordingly. + It also initializes the Kafka producer and Dask cluster for the streamer. """ - def __init__(self, cls_name : str = __name__) -> None: + def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1 + ) -> None: LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) - self.running_threads = {} # To keep track of all running analyzers - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) + self.active_streamers = {} + self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer + self.cluster = AnalyzerHelper.initialize_dask_cluster( + n_workers, threads_per_worker) # Local cluster + self.request_consumer = Consumer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest', + }) + def install_servicers(self): - threading.Thread(target=self.RequestListener, args=()).start() + threading.Thread( + target=self.RequestListener, + args=() + ).start() def RequestListener(self): """ listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") - # print ("Request Listener is initiated ...") - consumer = self.kafka_consumer + consumer = self.request_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: - receive_msg = consumer.poll(2.0) - if receive_msg is None: + message = consumer.poll(2.0) + if message is None: continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - # print ("Consumer error: {:}".format(receive_msg.error())) - break + elif message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + break + elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") + continue + elif message.error(): + raise KafkaException(message.error()) try: - analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + analyzer = json.loads(message.value().decode('utf-8')) + analyzer_uuid = message.key().decode('utf-8') + LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.StopDaskListener(analyzer_uuid) + if self.StopStreamer(analyzer_uuid): + LOGGER.info("Dask Streamer stopped.") + else: + LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...") else: - self.StartDaskListener(analyzer_uuid, analyzer) + if self.StartStreamer(analyzer_uuid, analyzer): + LOGGER.info("Dask Streamer started.") + else: + LOGGER.warning("Failed to start Dask Streamer.") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - # print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - - def StartDaskListener(self, analyzer_uuid, analyzer): - kpi_list = analyzer[ 'input_kpis' ] - thresholds = analyzer[ 'thresholds' ] - window_size = analyzer[ 'window_size' ] - window_slider = analyzer[ 'window_slider'] - - LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( - kpi_list, thresholds, window_size, window_slider)) - # print ("Received parameters: {:} - {:} - {:} - {:}".format( - # kpi_list, thresholds, window_size, window_slider)) + + + def StartStreamer(self, analyzer_uuid : str, analyzer : dict): + """ + Start the DaskStreamer with the given parameters. + """ + if analyzer_uuid in self.active_streamers: + LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid)) + return False try: - stop_event = Event() - thread = Thread( - target=DaskStreamer, - # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), - args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event), - kwargs={ - "window_size" : window_size, - } + streamer = DaskStreamer( + key = analyzer_uuid, + input_kpis = analyzer['input_kpis' ], + output_kpis = analyzer['output_kpis' ], + thresholds = analyzer['thresholds' ], + batch_size = analyzer['batch_size_min' ], + batch_duration = analyzer['batch_duration_min'], + window_size = analyzer['window_size' ], + cluster_instance = self.cluster, + producer_instance = self.central_producer, ) - thread.start() - self.running_threads[analyzer_uuid] = (thread, stop_event) - # print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + streamer.start() + LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + + # Stop the streamer after the given duration + duration = analyzer['duration'] + if duration > 0: + def stop_after_duration(): + time.sleep(duration) + LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}") + if not self.StopStreamer(analyzer_uuid): + LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") + + duration_thread = threading.Thread( + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}" + ) + duration_thread.start() + + self.active_streamers[analyzer_uuid] = streamer return True except Exception as e: - # print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e)) return False - def StopDaskListener(self, analyzer_uuid): - if analyzer_uuid in self.running_threads: - try: - thread, stop_event = self.running_threads[analyzer_uuid] - stop_event.set() - thread.join() - del self.running_threads[analyzer_uuid] - # print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + def StopStreamer(self, analyzer_uuid : str): + """ + Stop the DaskStreamer with the given analyzer_uuid. + """ + try: + if analyzer_uuid not in self.active_streamers: + LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return True - except Exception as e: - LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) - return False - else: - # print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}") + streamer = self.active_streamers[analyzer_uuid] + streamer.stop() + streamer.join() + del self.active_streamers[analyzer_uuid] + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.") + return True + except: + LOGGER.exception("Failed to stop Dask Streamer.") + return False + + def close(self): + """ + Close the producer and cluster cleanly. + """ + if self.central_producer: + try: + self.central_producer.flush() + LOGGER.info("Kafka producer flushed and closed.") + except: + LOGGER.exception("Error closing Kafka producer") + if self.cluster: + try: + self.cluster.close() + LOGGER.info("Dask cluster closed.") + except: + LOGGER.exception("Error closing Dask cluster") + + def stop(self): + self.close() + return super().stop() diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py new file mode 100644 index 0000000000000000000000000000000000000000..256530ba78329f09327b551f0238f4dd8a1258b5 --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -0,0 +1,136 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from enum import Enum +import pandas as pd + +logger = logging.getLogger(__name__) + + +class Handlers(Enum): + AGGREGATION_HANDLER = "AggregationHandler" + UNSUPPORTED_HANDLER = "UnsupportedHandler" + + @classmethod + def is_valid_handler(cls, handler_name): + return handler_name in cls._value2member_map_ + +# This method is top-level and should not be part of the class due to serialization issues. +def threshold_handler(key, aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + + Args: + key (str): Key for the aggregated DataFrame. + aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th). + + Returns: + pd.DataFrame: DataFrame with additional threshold columns. + """ + for metric_name, threshold_values in thresholds.items(): + # Ensure the metric column exists in the DataFrame + if metric_name not in aggregated_df.columns: + logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.") + continue + + # Ensure the threshold values are valid (check for tuple specifically) + if isinstance(threshold_values, list) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + + # Add threshold columns with updated naming + aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th + aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th + else: + logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.") + return aggregated_df + +def aggregation_handler( + batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds + ): + """ + Process a batch of data and calculate aggregated values for each input KPI + and maps them to the output KPIs. """ + + logger.info(f"({batch_type_name}) Processing batch for key: {key}") + if not batch: + logger.info("Empty batch received. Skipping processing.") + return [] + else: + logger.info(f" >>>>> Processing {len(batch)} records for key: {key}") + + # Convert data into a DataFrame + df = pd.DataFrame(batch) + + # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) + df = df[df['kpi_id'].isin(input_kpi_list)].copy() + + if df.empty: + logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.") + return [] + + # Define all possible aggregation methods + aggregation_methods = { + "min" : ('kpi_value', 'min'), + "max" : ('kpi_value', 'max'), + "avg" : ('kpi_value', 'mean'), + "first" : ('kpi_value', lambda x: x.iloc[0]), + "last" : ('kpi_value', lambda x: x.iloc[-1]), + "variance": ('kpi_value', 'var'), + "count" : ('kpi_value', 'count'), + "range" : ('kpi_value', lambda x: x.max() - x.min()), + "sum" : ('kpi_value', 'sum'), + } + + results = [] + + # Process each KPI-specific task parameter + for kpi_index, kpi_id in enumerate(input_kpi_list): + + # logger.info(f"1.Processing KPI: {kpi_id}") + kpi_task_parameters = thresholds["task_parameter"][kpi_index] + + # Get valid task parameters for this KPI + valid_task_parameters = [ + method for method in kpi_task_parameters.keys() + if method in aggregation_methods + ] + + # Select the aggregation methods based on valid task parameters + selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} + + # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") + kpi_df = df[df['kpi_id'] == kpi_id] + + # Check if kpi_df is not empty before applying the aggregation methods + if not kpi_df.empty: + agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index() + + # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}") + + agg_df['kpi_id'] = output_kpi_list[kpi_index] + + # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") + record = threshold_handler(key, agg_df, kpi_task_parameters) + + results.extend(record.to_dict(orient='records')) + else: + logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") + continue + if results: + return results + else: + return [] diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py new file mode 100644 index 0000000000000000000000000000000000000000..15a45aee68341c599905983efd79737a9d4929ab --- /dev/null +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -0,0 +1,67 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import Consumer, Producer + +import logging +logger = logging.getLogger(__name__) + + +class AnalyzerHelper: + def __init__(self): + pass + + @staticmethod + def initialize_dask_client(cluster_instance): + """Initialize a local Dask client.""" + if cluster_instance is None: + logger.error("Dask Cluster is not initialized. Exiting.") + return None + client = Client(cluster_instance) + logger.info(f"Dask Client Initialized: {client}") + return client + + @staticmethod + def initialize_dask_cluster(n_workers=1, threads_per_worker=2): + """Initialize a local Dask cluster""" + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + logger.info(f"Dask Cluster Initialized: {cluster}") + return cluster + + @staticmethod + def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters + """Initialize the Kafka consumer.""" + consumer_conf = { + 'bootstrap.servers': KafkaConfig.get_kafka_address(), + 'group.id': 'analytics-backend', + 'auto.offset.reset': 'latest' + } + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + return consumer + + @staticmethod + def initialize_kafka_producer(): + """Initialize the Kafka producer.""" + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + + @staticmethod + def delivery_report(err, msg): + if err is not None: + logger.error(f"Message delivery failed: {err}") + else: + logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py deleted file mode 100644 index 79dee7ef972a8b5546e3b38289c4fdb5b4bcc0d1..0000000000000000000000000000000000000000 --- a/src/analytics/backend/service/DaskStreaming.py +++ /dev/null @@ -1,268 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import time -import json -from confluent_kafka import Consumer, Producer, KafkaException, KafkaError -import pandas as pd -from dask.distributed import Client, LocalCluster -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic - -logging.basicConfig(level=logging.INFO) -LOGGER = logging.getLogger(__name__) - -def SettingKafkaConsumerParams(): - return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-backend', - 'auto.offset.reset' : 'latest'} - -def GetAggregationMappings(thresholds): - agg_dict = {} - for threshold_key in thresholds.keys(): - parts = threshold_key.split('_', 1) - if len(parts) != 2: - LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.") - continue - aggregation, metric_name = parts - # Ensure that the aggregation function is valid in pandas - if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: - LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") - continue - agg_dict[threshold_key] = ('kpi_value', aggregation) - return agg_dict - - -def ApplyThresholds(aggregated_df, thresholds): - """ - Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary - on the aggregated DataFrame. - Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. - thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'. - Returns: pd.DataFrame: DataFrame with additional threshold columns. - """ - for threshold_key, threshold_values in thresholds.items(): - if threshold_key not in aggregated_df.columns: - - LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") - continue - if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: - fail_th, raise_th = threshold_values - aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th - aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th - aggregated_df["value"] = aggregated_df[threshold_key] - else: - LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") - return aggregated_df - -def initialize_dask_client(): - """ - Initialize a local Dask cluster and client. - """ - cluster = LocalCluster(n_workers=2, threads_per_worker=2) - client = Client(cluster) - LOGGER.info(f"Dask Client Initialized: {client}") - return client, cluster - -def initialize_kafka_producer(): - return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) - -def delivery_report(err, msg): - if err is not None: - LOGGER.error(f"Message delivery failed: {err}") - else: - LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - -def process_batch(batch, agg_mappings, thresholds, key): - """ - Process a batch of data and apply thresholds. - Args: batch (list of dict): List of messages from Kafka. - agg_mappings (dict): Mapping from threshold key to aggregation function. - thresholds (dict): Thresholds dictionary. - Returns: list of dict: Processed records ready to be sent to Kafka. - """ - if not batch: - LOGGER.info("Empty batch received. Skipping processing.") - return [] - - - df = pd.DataFrame(batch) - LOGGER.info(f"df {df} ") - df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') - df.dropna(subset=['time_stamp'], inplace=True) - LOGGER.info(f"df {df} ") - required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} - if not required_columns.issubset(df.columns): - LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") - return [] - if df.empty: - LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") - return [] - - # Perform aggregations using named aggregation - try: - agg_dict = {key: value for key, value in agg_mappings.items()} - - df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index() - - #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min') - - #given that threshold has 1 value - second_value_tuple = next(iter(agg_dict.values()))[1] - #in case we have multiple thresholds! - #second_values_tuples = [value[1] for value in agg_dict.values()] - if second_value_tuple=="min": - df_agg = df_agg_.min(numeric_only=True).to_frame().T - elif second_value_tuple == "max": - df_agg = df_agg_.max(numeric_only=True).to_frame().T - elif second_value_tuple == "std": - df_agg = df_agg_.sted(numeric_only=True).to_frame().T - else: - df_agg = df_agg_.mean(numeric_only=True).to_frame().T - - # Assign the first value of window_start from the original aggregated data - df_agg['window_start'] = df_agg_['window_start'].iloc[0] - - # Reorder columns to place 'window_start' first if needed - cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start'] - df_agg = df_agg[cols] - - except Exception as e: - LOGGER.error(f"Aggregation error: {e}") - return [] - - # Apply thresholds - - - df_thresholded = ApplyThresholds(df_agg, thresholds) - df_thresholded['kpi_id'] = key - df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') - # Convert aggregated DataFrame to list of dicts - result = df_thresholded.to_dict(orient='records') - LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") - return result - -def produce_result(result, producer, destination_topic): - for record in result: - try: - producer.produce( - destination_topic, - key=str(record.get('kpi_id', '')), - value=json.dumps(record), - callback=delivery_report - ) - except KafkaException as e: - LOGGER.error(f"Failed to produce message: {e}") - producer.flush() - LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") - -def DaskStreamer(key, kpi_list, thresholds, stop_event, - window_size="30s", time_stamp_col="time_stamp"): - client, cluster = initialize_dask_client() - consumer_conf = SettingKafkaConsumerParams() - consumer = Consumer(consumer_conf) - consumer.subscribe([KafkaTopic.VALUE.value]) - producer = initialize_kafka_producer() - - # Parse window_size to seconds - try: - window_size_td = pd.to_timedelta(window_size) - window_size_seconds = window_size_td.total_seconds() - except Exception as e: - LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") - window_size_seconds = 30 - LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") - - # Extract aggregation mappings from thresholds - agg_mappings = GetAggregationMappings(thresholds) - if not agg_mappings: - LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") - consumer.close() - producer.flush() - client.close() - cluster.close() - return - try: - batch = [] - last_batch_time = time.time() - LOGGER.info("Starting to consume messages...") - - while not stop_event.is_set(): - msg = consumer.poll(1.0) - - if msg is None: - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - continue - - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") - else: - LOGGER.error(f"Kafka error: {msg.error()}") - continue - - try: - message_value = json.loads(msg.value().decode('utf-8')) - except json.JSONDecodeError as e: - LOGGER.error(f"JSON decode error: {e}") - continue - - try: - message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') - LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.") - - if pd.isna(message_timestamp): - LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") - continue - window_start = message_timestamp.floor(window_size) - LOGGER.warning(f"window_start: {window_start}. Skipping message.") - message_value['window_start'] = window_start - except Exception as e: - LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") - continue - - if message_value['kpi_id'] not in kpi_list: - LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") - continue - - batch.append(message_value) - - current_time = time.time() - if (current_time - last_batch_time) >= window_size_seconds and batch: - LOGGER.info("Time-based batch threshold reached. Processing batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds, key) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - batch = [] - last_batch_time = current_time - - except Exception as e: - LOGGER.exception(f"Error in Dask streaming process: {e}") - finally: - # Process any remaining messages in the batch - if batch: - LOGGER.info("Processing remaining messages in the batch.") - future = client.submit(process_batch, batch, agg_mappings, thresholds) - future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) - consumer.close() - producer.flush() - LOGGER.info("Kafka consumer and producer closed.") - client.close() - cluster.close() - LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py new file mode 100644 index 0000000000000000000000000000000000000000..10917f002ed306ea408a20b92e94aed597ef1ea3 --- /dev/null +++ b/src/analytics/backend/service/Streamer.py @@ -0,0 +1,167 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import json +import threading +import logging + +from confluent_kafka import KafkaException, KafkaError +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper + + +logger = logging.getLogger(__name__) + + +class DaskStreamer(threading.Thread): + def __init__(self, key, input_kpis, output_kpis, thresholds, + batch_size = 5, + batch_duration = None, + window_size = None, + cluster_instance = None, + producer_instance = AnalyzerHelper.initialize_kafka_producer() + ): + super().__init__() + self.key = key + self.input_kpis = input_kpis + self.output_kpis = output_kpis + self.thresholds = thresholds + self.window_size = window_size # TODO: Not implemented + self.batch_size = batch_size + self.batch_duration = batch_duration + self.running = True + self.batch = [] + + # Initialize Kafka and Dask components + self.client = AnalyzerHelper.initialize_dask_client(cluster_instance) + self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer + self.producer = producer_instance + + logger.info("Dask Streamer initialized.") + + def run(self): + """Main method to start the DaskStreamer.""" + try: + logger.info("Starting Dask Streamer") + last_batch_time = time.time() + while True: + if not self.consumer: + logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.") + break + if not self.running: + logger.warning("Dask Streamer instance has been terminated. Exiting loop.") + break + if not self.client: + logger.warning("Dask client is not running. Exiting loop.") + break + message = self.consumer.poll(timeout=1.0) + if message is None: + # logger.info("No new messages received.") + continue + if message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}") + elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.") + continue + elif message.error(): + raise KafkaException(message.error()) + else: + try: + value = json.loads(message.value()) + except json.JSONDecodeError: + logger.error(f"Failed to decode message: {message.value()}") + continue + self.batch.append(value) + + # Window size has a precedence over batch size + if self.batch_duration is None: + if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size + logger.info(f"Processing based on batch size {self.batch_size}.") + self.task_handler_selector() + self.batch = [] + else: + # Process based on window size + current_time = time.time() + if (current_time - last_batch_time) >= self.batch_duration and self.batch: + logger.info(f"Processing based on window size {self.batch_duration}.") + self.task_handler_selector() + self.batch = [] + last_batch_time = current_time + + except Exception as e: + logger.exception(f"Error in Dask streaming process: {e}") + finally: + self.stop() + logger.info(">>> Exiting Dask Streamer...") + + def task_handler_selector(self): + """Select the task handler based on the task type.""" + logger.info(f"Batch to be processed: {self.batch}") + if Handlers.is_valid_handler(self.thresholds["task_type"]): + if self.client is not None and self.client.status == 'running': + try: + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + except Exception as e: + logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") + else: + logger.warning("Dask client is not running. Skipping processing.") + else: + logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.") + + def produce_result(self, result, destination_topic): + """Produce results to the Kafka topic.""" + if not result: + logger.warning("Nothing to produce. Skipping.") + return + for record in result: + try: + self.producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=AnalyzerHelper.delivery_report + ) + except KafkaException as e: + logger.error(f"Failed to produce message: {e}") + self.producer.flush() + logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + + def stop(self): + """Clean up Kafka and Dask thread resources.""" + if not self.running: + logger.info("Dask Streamer is already stopped.") + return + self.running = False + logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...") + time.sleep(5) # Waiting time for running tasks to complete + if self.consumer: + try: + self.consumer.close() + logger.info("Kafka consumer closed.") + except Exception as e: + logger.error(f"Error closing Kafka consumer: {e}") + + if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running': + try: + self.client.close() + logger.info("Dask client closed.") + except Exception as e: + logger.error(f"Error closing Dask client: {e}") + +# TODO: May be Single streamer for all analyzers ... ? diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 55d966dfbbae2318a5f774049b02f5b340070113..2ff1e93536e94863ba9eaaf76d35f7453ba95727 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -53,8 +53,8 @@ def create_analyzer(): _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - _kpi_id = KpiId() # input IDs to analyze + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7" _create_analyzer.input_kpi_ids.append(_kpi_id) @@ -63,11 +63,14 @@ def create_analyzer(): _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id = KpiId() _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter _threshold_dict = { # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py new file mode 100644 index 0000000000000000000000000000000000000000..813b4f06cdbaf1dd1c00c4d0f0dfa4b78339e09c --- /dev/null +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -0,0 +1,67 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +from analytics.backend.service.AnalyzerHandlers import Handlers + +def get_input_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] + +def get_output_kpi_list(): + return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4'] + +def get_thresholds(): + return { + "task_type": Handlers.AGGREGATION_HANDLER.value, + "task_parameter": [ + {"last": [40, 80], "variance": [300, 500]}, + {"count": [2, 4], "max": [70, 100]}, + {"min": [10, 20], "avg": [50, 70]}, + ], + } + +def get_duration(): + return 90 + +def get_batch_duration(): + return 30 + +def get_windows_size(): + return None + +def get_batch_size(): + return 5 + +def get_interval(): + return 5 + +def get_batch(): + return [ + {"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72}, + {"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22}, + {"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24}, + {"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67}, + {"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6}, + {"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9}, + {"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44}, + {"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76}, + {"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71}, + {"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44}, + ] + +def get_agg_df(): + data = [ + {"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41}, + ] + return pd.DataFrame(data) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 4aa9df5fae9849ee429361603a35b2fb8eaa4d23..1bbfaee136942bb7f14e77438683e8460f0a4f0b 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,148 +12,307 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time, json -from typing import Dict +import time +import json +import pytest import logging -from threading import Event, Thread -from common.tools.kafka.Variables import KafkaTopic +import pandas as pd + +from unittest.mock import MagicMock, patch +from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ + get_windows_size, get_batch_size, get_agg_df, get_duration + +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.Streamer import DaskStreamer +from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService -from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer, create_analyzer_dask -from threading import Thread, Event -from ..service.DaskStreaming import DaskStreamer -LOGGER = logging.getLogger(__name__) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') + + +# ---- +# Test fixtures and helper functions +# ---- + +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + logger.info(f" >>>>> Starting test: {request.node.name} ") + yield + logger.info(f" <<<<< Finished test: {request.node.name} ") + +@pytest.fixture +def mock_kafka_producer(): + mock_producer = MagicMock() + mock_producer.produce = MagicMock() + mock_producer.flush = MagicMock() + return mock_producer + +@pytest.fixture +def mock_dask_cluster(): + mock_cluster = MagicMock() + mock_cluster.close = MagicMock() + return mock_cluster + +@pytest.fixture +def mock_dask_client(): + mock_client = MagicMock() + mock_client.status = 'running' + mock_client.submit = MagicMock() + return mock_client + +@pytest.fixture() +def mock_kafka_consumer(): + mock_consumer = MagicMock() + mock_consumer.subscribe = MagicMock() + return mock_consumer + +@pytest.fixture() +def mock_streamer_start(): + mock_streamer = MagicMock() + mock_streamer.start = MagicMock() + return mock_streamer ########################### -# Tests Implementation of Telemetry Backend +# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods ########################### -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - - -# --- To test Dask Streamer functionality --- -# def test_StartDaskStreamer(): # Directly from the Streamer class -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# stop_event = Event() -# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] -# oper_list = ['avg', 'min', 'max',] -# thresholds = { -# 'avg_value': (10.0, 90.0), -# 'min_value': (5.0, 95.0), -# 'max_value': (15.0, 85.0), -# 'latency' : (2.0, 10.0) -# } - -# # Start the DaskStreamer in a separate thread -# streamer_thread = Thread( -# target=DaskStreamer, -# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), -# kwargs={ -# "window_size": "60s", -# "win_slide_duration": "30s", -# "time_stamp_col": "time_stamp" -# } -# ) -# streamer_thread.start() -# try: -# while True: -# time.sleep(10) -# except KeyboardInterrupt: -# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") -# stop_event.set() -# streamer_thread.join() -# LOGGER.info("Streamer stopped gracefully.") - -# --- To test Start Streamer functionality --- -# def test_StartDaskStreamer(): -# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") -# analyzer_obj = create_analyzer_dask() -# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), -# # "oper_list" : analyzer_obj.parameters["oper_list"], -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) -# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) -# time.sleep(100) -# LOGGER.info('Initiating StopRequestListener...') -# # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) +@pytest.fixture +def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \ + patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start): + + service = AnalyticsBackendService() + yield service + service.close() -# --- To test Start Streamer functionality --- -# def test_StartSparkStreamer(): -# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") -# analyzer_obj = create_analyzer() -# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid -# analyzer_to_generate : Dict = { -# "algo_name" : analyzer_obj.algorithm_name, -# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], -# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], -# "oper_mode" : analyzer_obj.operation_mode, -# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), -# "window_size" : analyzer_obj.parameters["window_size"], -# "window_slider" : analyzer_obj.parameters["window_slider"], -# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] -# } -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) -# assert isinstance(response, bool) +@pytest.fixture +def analyzer_data(): + return { + 'algo_name' : 'test_algorithm', + 'oper_mode' : 'test_mode', + 'input_kpis' : get_input_kpi_list(), + 'output_kpis' : get_output_kpi_list(), + 'thresholds' : get_thresholds(), + 'duration' : get_duration(), + 'batch_size_min' : get_batch_size(), + 'window_size' : get_windows_size(), + 'batch_duration_min' : get_duration(), + } -# --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# AnalyticsBackendServiceObj.stop_event = Event() -# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) -# listener_thread.start() - -# time.sleep(100) - - # AnalyticsBackendServiceObj.stop_event.set() - # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') - # listener_thread.join(timeout=10) - # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." - # LOGGER.info('Completed test_RunRequestListener') - -# To test START and STOP communication together -# def test_StopRequestListener(): -# LOGGER.info('test_RunRequestListener') -# LOGGER.info('Initiating StartRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) -# # LOGGER.debug(str(response_thread)) -# time.sleep(10) -# LOGGER.info('Initiating StopRequestListener...') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) +def test_start_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + # Start streamer + result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert result is True + assert analyzer_uuid in analytics_service.active_streamers + +def test_stop_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + + # Start streamer for stopping it later + analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert analyzer_uuid in analytics_service.active_streamers + + # Stop streamer + with patch('time.sleep', return_value=None): + result = analytics_service.StopStreamer(analyzer_uuid) + assert result is True + assert analyzer_uuid not in analytics_service.active_streamers + + # Verify that the streamer was stopped + assert analyzer_uuid not in analytics_service.active_streamers + +def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster): + analytics_service.close() + + mock_kafka_producer.flush.assert_called_once() + mock_dask_cluster.close.assert_called_once() + +########################### +# funtionality pytest with specific fixtures for streamer class sub methods +########################### + +@pytest.fixture +def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer): + + return DaskStreamer( + key = "test_key", + input_kpis = get_input_kpi_list(), + output_kpis = get_output_kpi_list(), + thresholds = get_thresholds(), + batch_size = get_batch_size(), + window_size = get_windows_size(), + cluster_instance = mock_dask_cluster(), + producer_instance = mock_kafka_producer(), + ) + +def test_dask_streamer_initialization(dask_streamer): + """Test if the DaskStreamer initializes correctly.""" + assert dask_streamer.key == "test_key" + assert dask_streamer.batch_size == get_batch_size() + assert dask_streamer.window_size is None + assert dask_streamer.consumer is not None + assert dask_streamer.producer is not None + assert dask_streamer.client is not None + +def test_run_stops_on_no_consumer(dask_streamer): + """Test if the run method exits when the consumer is not initialized.""" + dask_streamer.consumer = None + with patch('time.sleep', return_value=None): + dask_streamer.run() -# To independently tests the SparkListener functionality -# def test_SparkListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.RunSparkStreamer( -# get_kpi_id_list(), get_operation_list(), get_threshold_dict() -# ) -# LOGGER.debug(str(response)) + assert not dask_streamer.running + +def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client): + """Test task handler selection with a valid handler.""" + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client): + + dask_streamer.task_handler_selector() + assert dask_streamer.client.status == 'running' + +def test_task_handler_selector_invalid_handler(dask_streamer): + """Test task handler selection with an invalid handler.""" + with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False): + dask_streamer.task_handler_selector() + assert dask_streamer.batch == [] + +def test_produce_result(dask_streamer): + """Test if produce_result sends records to Kafka.""" + result = [{"kpi_id": "kpi1", "value": 100}] + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \ + patch.object(dask_streamer.producer, 'produce') as mock_produce: + dask_streamer.produce_result(result, "test_topic") + mock_produce.assert_called_once_with( + "test_topic", + key="kpi1", + value=json.dumps({"kpi_id": "kpi1", "value": 100}), + callback=mock_delivery_report + ) + +def test_stop(dask_streamer): + """Test the cleanup method.""" + with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ + patch.object(dask_streamer.client, 'close') as mock_client_close, \ + patch('time.sleep', return_value=0): + + # Mock the conditions required for the close calls + dask_streamer.client.status = 'running' + + dask_streamer.stop() + + mock_consumer_close.assert_called_once() + mock_client_close.assert_called_once() + +def test_run_with_valid_consumer(dask_streamer): + """Test the run method with a valid Kafka consumer.""" + with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ + patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: + + # Simulate valid messages without errors + mock_message_1 = MagicMock() + mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}' + mock_message_1.error.return_value = None # No error + + mock_message_2 = MagicMock() + mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}' + mock_message_2.error.return_value = None # No error + + # Mock `poll` to return valid messages + mock_poll.side_effect = [mock_message_1, mock_message_2] + + # Run the `run` method in a limited loop + with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays + dask_streamer.running = True + dask_streamer.batch_size = 2 + + # Limit the loop by breaking it after one full processing cycle + def stop_running_after_task_handler(): + logger.info("Stopping the streamer after processing the first batch.") + dask_streamer.running = False + + mock_task_handler_selector.side_effect = stop_running_after_task_handler + dask_streamer.run() + + assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing + mock_task_handler_selector.assert_called_once() # Task handler should be called once + mock_poll.assert_any_call(timeout=1.0) # Poll should have been called at least once + +# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py +def test_aggregation_handler(): + + # Create a sample batch + batch = get_batch() + input_kpi_list = get_input_kpi_list() + output_kpi_list = get_output_kpi_list() + thresholds = get_thresholds() + + # Test aggregation_handler + aggregated_df = aggregation_handler( + "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds + ) + assert isinstance(aggregated_df, list) + assert all(isinstance(item, dict) for item in aggregated_df) + +# # Test threshold_handler +def test_threshold_handler(): + # Create a sample aggregated DataFrame + agg_df = get_agg_df() + thresholds = get_thresholds() + + # Test threshold_handler + result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) + + assert isinstance(result, pd.DataFrame) + assert result.shape == (1, 7) + + +########################### +# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline) +########################### +# This is a local machine test to check the integration of the backend service with the Streamer + +# @pytest.fixture(scope='session') +# def analyticBackend_service(): +# logger.info('Initializing AnalyticsBackendService...') + +# _service = AnalyticsBackendService() +# _service.start() + +# logger.info('Yielding AnalyticsBackendService...') +# yield _service + +# logger.info('Terminating AnalyticsBackendService...') +# _service.stop() +# logger.info('Terminated AnalyticsBackendService...') + + +# # --- "test_validate_kafka_topics" should be run before the functionality tests --- +# def test_validate_kafka_topics(): +# logger.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() # assert isinstance(response, bool) + +# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService): +# # backendServiceObject = AnalyticsBackendService() +# # backendServiceObject.install_servicers() +# logger.info(" waiting for 2 minutes for the backend service before termination ... ") +# time.sleep(300) +# logger.info(" Initiating stop collector ... ") +# status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") +# analyticBackend_service.close() +# assert isinstance(status, bool) +# assert status == True +# logger.info(" Backend service terminated successfully ... ") diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index fd5bcd185b1f9945eccf583c33af2a243fe729be..cd20503e7dbe1059b2209e4b0ccd29a229e7916e 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -46,7 +46,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, - request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore + request : Analyzer, context: grpc.ServicerContext # type: ignore ) -> AnalyzerId: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) response = AnalyzerId() @@ -65,14 +65,18 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): """ analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "duration" : analyzer_obj.duration_s, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], # slider window size in seconds (single batch execution time) + "window_slider" : analyzer_obj.parameters["window_slider"], # slider shift in seconds + "batch_size_min" : analyzer_obj.batch_min_size, # currently implemented + "batch_size_max" : analyzer_obj.batch_max_size, + "batch_duration_min" : analyzer_obj.batch_min_duration_s, # currently implemented + "batch_interval_max" : analyzer_obj.batch_max_duration_s } self.kafka_producer.produce( KafkaTopic.ANALYTICS_REQUEST.value, @@ -137,7 +141,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, - request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore + request : AnalyzerId, context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) try: @@ -181,7 +185,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, - filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + filter : AnalyzerFilter, context: grpc.ServicerContext # type: ignore ) -> AnalyzerList: # type: ignore LOGGER.info("At Service gRPC message: {:}".format(filter)) response = AnalyzerList() @@ -202,7 +206,5 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - # print ('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 4df6070bedffd91402953bbbbbec16ce0118008c..326bc0be22c0d0e01ccdd79b439b82a88d06e0ad 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -20,43 +20,77 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() - # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" - _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + # _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() - # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" - _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_new_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - _kpi_id = KpiId() # input IDs to analyze - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id = KpiId() + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id = KpiId() + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf181818" + _create_analyzer.output_kpi_ids.append(_kpi_id) + + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf441616" _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + # _threshold_dict = { + # 'mean_value' :[20, 30], 'min_value' :[00, 10], 'max_value' :[45, 50], + # 'first_value' :[00, 10], 'last_value' :[40, 50], 'std_value' :[00, 10] + # } _threshold_dict = { - 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) - } + "task_type": Handlers.AGGREGATION_HANDLER.value, + "task_parameter": [ + {"last": [40, 80], "variance": [300, 500]}, + {"count": [2, 4], "max": [70, 100]}, + {"min": [10, 20], "avg": [50, 70]}, + ], + } + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size - _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + _create_analyzer.parameters['window_size'] = "0" # slider window size in seconds (Total time for aggeration processing) + _create_analyzer.parameters['window_slider'] = "0" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + # duration of the analyzer + _create_analyzer.duration_s = 90 + + # batch window size + _create_analyzer.batch_min_duration_s = 20 + _create_analyzer.batch_max_duration_s = 50 + + # batch size + _create_analyzer.batch_min_size = 5 + _create_analyzer.batch_max_size = 10 + return _create_analyzer def create_analyzer_filter(): @@ -84,3 +118,10 @@ def create_analyzer_filter(): # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) return _create_analyzer_filter + + +# Added for testing to remove the dependency on the backend service +from enum import Enum +class Handlers(Enum): + AGGREGATION_HANDLER = "AggregationHandler" + UNSUPPORTED_HANDLER = "UnsupportedHandler" diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 134871fb77719e4747b6fc3ae6cfd21dd317a31f..7d8a08d3ad2d82758b088a8f83342c2b3929eadd 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -78,6 +78,15 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic LOGGER.info('Closed AnalyticsFrontendClient...') +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") ########################### # Tests Implementation of Analytics Frontend @@ -89,24 +98,17 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -# ----- core funtionality test ----- -# def test_StartAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_StartAnalytic START: <<< ') -# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerId) - # To test start and stop listener together def test_StartAnalyzers(analyticsFrontend_client): LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) + # LOGGER.info(' --> Calling StartResponseListener... ') + # class_obj = AnalyticsFrontendServiceServicerImpl() + # response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid) + # LOGGER.debug(response) + LOGGER.info("waiting for timer to complete ...") + time.sleep(15) LOGGER.info('--> StopAnalyzer') response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) LOGGER.debug(str(response)) diff --git a/src/common/DeviceTypes.py b/src/common/DeviceTypes.py index 30bbd0b15e9d199294a3d7ee7826b22695d0e45c..9a982d1eb71e8b139d2a86fe1a774154239c7147 100644 --- a/src/common/DeviceTypes.py +++ b/src/common/DeviceTypes.py @@ -38,6 +38,7 @@ class DeviceTypeEnum(Enum): CLIENT = 'client' DATACENTER = 'datacenter' IP_SDN_CONTROLLER = 'ip-sdn-controller' + NCE = 'nce' MICROWAVE_RADIO_SYSTEM = 'microwave-radio-system' OPEN_LINE_SYSTEM = 'open-line-system' OPTICAL_ROADM = 'optical-roadm' @@ -52,6 +53,4 @@ class DeviceTypeEnum(Enum): # ETSI TeraFlowSDN controller TERAFLOWSDN_CONTROLLER = 'teraflowsdn' - IETF_SLICE = 'ietf-slice' - - NCE = 'nce' \ No newline at end of file + IETF_SLICE = 'ietf-slice' diff --git a/src/common/tools/context_queries/Slice.py b/src/common/tools/context_queries/Slice.py index 686f08a845df5c99fdd0cace393290419d92922c..12bd8c03c68ab0baea5b4db91b7468e2eae7f1a9 100644 --- a/src/common/tools/context_queries/Slice.py +++ b/src/common/tools/context_queries/Slice.py @@ -13,14 +13,14 @@ # limitations under the License. import logging +import grpc from typing import Optional, Tuple, Union from uuid import UUID, uuid5 - -import grpc - from common.Constants import DEFAULT_CONTEXT_NAME from common.method_wrappers.ServiceExceptions import InvalidArgumentsException from common.proto.context_pb2 import ContextId, Slice, SliceFilter, SliceId +from common.method_wrappers.ServiceExceptions import InvalidArgumentsException +from common.proto.context_pb2 import ContextId, Slice, SliceFilter, SliceId from context.client.ContextClient import ContextClient diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 21e66af137302553663d6e0a6701368bda638017..5c7501b6c07e6aaa26569e2817fca374e6b0c12e 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -41,14 +41,14 @@ class KafkaConfig(Enum): class KafkaTopic(Enum): # TODO: Later to be populated from ENV variable. - REQUEST = 'topic_request' - RESPONSE = 'topic_response' + TELEMETRY_REQUEST = 'topic_telemetry_request' + TELEMETRY_RESPONSE = 'topic_telemetry_response' RAW = 'topic_raw' LABELED = 'topic_labeled' VALUE = 'topic_value' ALARMS = 'topic_alarms' - ANALYTICS_REQUEST = 'topic_request_analytics' - ANALYTICS_RESPONSE = 'topic_response_analytics' + ANALYTICS_REQUEST = 'topic_analytics_request' + ANALYTICS_RESPONSE = 'topic_analytics_response' @staticmethod def create_all_topics() -> bool: diff --git a/src/nbi/service/rest_server/nbi_plugins/ietf_l3vpn/Handlers.py b/src/nbi/service/rest_server/nbi_plugins/ietf_l3vpn/Handlers.py index b76bf0cec3a99907a182fbde4777939dfa3d4a67..0e8b8013ef5cba1305cdd040ea68efc653eefa5e 100644 --- a/src/nbi/service/rest_server/nbi_plugins/ietf_l3vpn/Handlers.py +++ b/src/nbi/service/rest_server/nbi_plugins/ietf_l3vpn/Handlers.py @@ -15,7 +15,7 @@ import logging, netaddr from typing import Dict, List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_NAME -from common.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum, Empty +from common.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum from common.tools.context_queries.Service import get_service_by_uuid from common.tools.grpc.ConfigRules import update_config_rule_custom from common.tools.grpc.Constraints import ( @@ -97,11 +97,8 @@ def update_service_endpoint( endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid) field_updates = {} if vlan_tag is not None: field_updates['vlan_tag' ] = (vlan_tag, True) - # if ipv4_address is not None: field_updates['ip_address' ] = (ipv4_address, True) - # if neighbor_ipv4_address is not None: field_updates['neighbor_address'] = (neighbor_ipv4_address, True) - #! neighbor_ipv4_address and ipv4_address' field swapped to manage the PE. Fix it later - if ipv4_address is not None: field_updates['ip_address' ] = (neighbor_ipv4_address, True) - if neighbor_ipv4_address is not None: field_updates['neighbor_address'] = (ipv4_address, True) + if ipv4_address is not None: field_updates['ip_address' ] = (ipv4_address, True) + if neighbor_ipv4_address is not None: field_updates['neighbor_address'] = (neighbor_ipv4_address, True) if ipv4_prefix_length is not None: field_updates['prefix_length' ] = (ipv4_prefix_length, True) update_config_rule_custom(config_rules, endpoint_settings_key, field_updates) @@ -116,8 +113,6 @@ def update_service_endpoint( def process_site_network_access( site_id : str, network_access : Dict, site_static_routing : Dict[Tuple[str, str], str], errors : List[Dict] ) -> None: - # client = ContextClient() - # devices = client.ListDevices(Empty()).devices endpoint_uuid = network_access['site-network-access-id'] if network_access['site-network-access-type'] != 'ietf-l3vpn-svc:multipoint': @@ -125,12 +120,6 @@ def process_site_network_access( raise NotImplementedError(MSG.format(str(network_access['site-network-access-type']))) device_uuid = network_access['device-reference'] - - # location = network_access['location-reference'] - # for device in devices: - # for cr in device.device_config.config_rules: - # if cr.WhichOneof('config_rule') != 'custom': - # continue service_uuid = network_access['vpn-attachment']['vpn-id'] access_role : str = network_access['vpn-attachment']['site-role'] diff --git a/src/nbi/service/rest_server/nbi_plugins/ietf_network_slice/ietf_slice_handler.py b/src/nbi/service/rest_server/nbi_plugins/ietf_network_slice/ietf_slice_handler.py index 6c52a43988d041048c2d3f5efa927e0f5a90284b..80ce4c6b78d446ff1e08a750f236e0c143e1ba57 100644 --- a/src/nbi/service/rest_server/nbi_plugins/ietf_network_slice/ietf_slice_handler.py +++ b/src/nbi/service/rest_server/nbi_plugins/ietf_network_slice/ietf_slice_handler.py @@ -1,3 +1,17 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import json import logging import uuid diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py new file mode 100644 index 0000000000000000000000000000000000000000..ec4ba943c90de8a8d683d1e7a9dd9d48865b5edf --- /dev/null +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -0,0 +1,236 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union + +# Special resource names to request to the collector to retrieve the specified +# configuration/structural resources. +# These resource names should be used with GetConfig() method. +RESOURCE_ENDPOINTS = '__endpoints__' +RESOURCE_INTERFACES = '__interfaces__' +RESOURCE_NETWORK_INSTANCES = '__network_instances__' +RESOURCE_ROUTING_POLICIES = '__routing_policies__' +RESOURCE_SERVICES = '__services__' +RESOURCE_ACL = '__acl__' +RESOURCE_INVENTORY = '__inventory__' + + +class _Collector: + def __init__(self, name : str, address: str, port: int, **settings) -> None: + """ Initialize Collector. + Parameters: + address : str + The address of the device + port : int + The port of the device + **settings + Extra settings required by the collector. + """ + self._name = name + self._address = address + self._port = port + self._settings = settings + + @property + def name(self): return self._name + + @property + def address(self): return self._address + + @property + def port(self): return self._port + + @property + def settings(self): return self._settings + + def Connect(self) -> bool: + """ Connect to the Device. + Returns: + succeeded : bool + Boolean variable indicating if connection succeeded. + """ + raise NotImplementedError() + + def Disconnect(self) -> bool: + """ Disconnect from the Device. + Returns: + succeeded : bool + Boolean variable indicating if disconnection succeeded. + """ + raise NotImplementedError() + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + """ Retrieve initial configuration of entire device. + Returns: + values : List[Tuple[str, Any]] + List of tuples (resource key, resource value) for + resource keys. + """ + raise NotImplementedError() + + def GetConfig(self, resource_keys: List[str] = []) -> \ + List[Tuple[str, Union[Any, None, Exception]]]: + """ Retrieve running configuration of entire device or + selected resource keys. + Parameters: + resource_keys : List[str] + List of keys pointing to the resources to be retrieved. + Returns: + values : List[Tuple[str, Union[Any, None, Exception]]] + List of tuples (resource key, resource value) for + resource keys requested. If a resource is found, + the appropriate value type must be retrieved. + If a resource is not found, None must be retrieved as + value for that resource. In case of Exception, + the Exception must be retrieved as value. + """ + raise NotImplementedError() + + def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ + List[Union[bool, Exception]]: + """ Create/Update configuration for a list of resources. + Parameters: + resources : List[Tuple[str, Any]] + List of tuples, each containing a resource_key pointing the + resource to be modified, and a resource_value containing + the new value to be set. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key changes requested. + Return values must be in the same order as the + resource keys requested. If a resource is properly set, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ + List[Union[bool, Exception]]: + """ Delete configuration for a list of resources. + Parameters: + resources : List[Tuple[str, Any]] + List of tuples, each containing a resource_key pointing the + resource to be modified, and a resource_value containing + possible additionally required values to locate + the value to be removed. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key deletions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly deleted, True must be + retrieved; otherwise, the Exception that is raised during + the processing must be retrieved. + """ + raise NotImplementedError() + + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \ + List[Union[bool, Exception]]: + """ Subscribe to state information of entire device or + selected resources. Subscriptions are incremental. + Collector should keep track of requested resources. + Parameters: + subscriptions : List[Tuple[str, float, float]] + List of tuples, each containing a resource_key pointing the + resource to be subscribed, a sampling_duration, and a + sampling_interval (both in seconds with float + representation) defining, respectively, for how long + monitoring should last, and the desired monitoring interval + for the resource specified. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key subscriptions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly subscribed, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \ + -> List[Union[bool, Exception]]: + """ Unsubscribe from state information of entire device + or selected resources. Subscriptions are incremental. + Collector should keep track of requested resources. + Parameters: + subscriptions : List[str] + List of tuples, each containing a resource_key pointing the + resource to be subscribed, a sampling_duration, and a + sampling_interval (both in seconds with float + representation) defining, respectively, for how long + monitoring should last, and the desired monitoring interval + for the resource specified. + Returns: + results : List[Union[bool, Exception]] + List of results for resource key un-subscriptions requested. + Return values must be in the same order as the resource keys + requested. If a resource is properly unsubscribed, + True must be retrieved; otherwise, the Exception that is + raised during the processing must be retrieved. + """ + raise NotImplementedError() + + def GetState( + self, blocking=False, terminate : Optional[threading.Event] = None + ) -> Iterator[Tuple[float, str, Any]]: + """ Retrieve last collected values for subscribed resources. + Operates as a generator, so this method should be called once and will + block until values are available. When values are available, + it should yield each of them and block again until new values are + available. When the collector is destroyed, GetState() can return instead + of yield to terminate the loop. + Terminate enables to request interruption of the generation. + Examples: + # keep looping waiting for extra samples (generator loop) + terminate = threading.Event() + i = 0 + for timestamp,resource_key,resource_value in my_collector.GetState(blocking=True, terminate=terminate): + process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() + + # just retrieve accumulated samples + samples = my_collector.GetState(blocking=False, terminate=terminate) + # or (as classical loop) + i = 0 + for timestamp,resource_key,resource_value in my_collector.GetState(blocking=False, terminate=terminate): + process(timestamp, resource_key, resource_value) + i += 1 + if i == 10: terminate.set() + Parameters: + blocking : bool + Select the collector behaviour. In both cases, the collector will + first retrieve the samples accumulated and available in the + internal queue. Then, if blocking, the collector does not + terminate the loop and waits for additional samples to come, + thus behaving as a generator. If non-blocking, the collector + terminates the loop and returns. Non-blocking behaviour can + be used for periodically polling the collector, while blocking + can be used when a separate thread is in charge of + collecting the samples produced by the collector. + terminate : threading.Event + Signals the interruption of the GetState method as soon as + possible. + Returns: + results : Iterator[Tuple[float, str, Any]] + Sequences of state sample. Each State sample contains a + float Unix-like timestamps of the samples in seconds with up + to microsecond resolution, the resource_key of the sample, + and its resource_value. + Only resources with an active subscription must be + retrieved. Interval and duration of the sampling process are + specified when creating the subscription using method + SubscribeState(). Order of values yielded is arbitrary. + """ + raise NotImplementedError() diff --git a/src/telemetry/backend/collector_api/__init__.py b/src/telemetry/backend/collector_api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..023830645e0fcb60e3f8583674a954810af222f2 --- /dev/null +++ b/src/telemetry/backend/collector_api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/collectors/__init__.py b/src/telemetry/backend/collectors/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..023830645e0fcb60e3f8583674a954810af222f2 --- /dev/null +++ b/src/telemetry/backend/collectors/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py new file mode 100644 index 0000000000000000000000000000000000000000..90be013368c5aa80dcb52c2394e8b74f9d74b6f4 --- /dev/null +++ b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py @@ -0,0 +1,450 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytz +import queue +import logging +import uuid +import json +from anytree import Node, Resolver +from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime, timedelta +from typing import Any, Iterator, List, Tuple, Union, Optional +from telemetry.backend.collector_api._Collector import _Collector +from .EmulatedHelper import EmulatedCollectorHelper +from .SyntheticMetricsGenerator import SyntheticMetricsGenerator + + +class EmulatedCollector(_Collector): + """ + EmulatedCollector is a class that simulates a network collector for testing purposes. + It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. + """ + def __init__(self, address: str, port: int, **settings): + super().__init__('emulated_collector', address, port, **settings) + self._initial_config = Node('root') # Tree structure for initial config + self._running_config = Node('root') # Tree structure for running config + self._subscriptions = Node('subscriptions') # Tree for state subscriptions + self._resolver = Resolver() # For path resolution in tree structures + self._out_samples = queue.Queue() # Queue to hold synthetic state samples + self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator + self._scheduler = BackgroundScheduler(daemon=True) + self._scheduler.configure( + jobstores = {'default': MemoryJobStore()}, + executors = {'default': ThreadPoolExecutor(max_workers=1)}, + timezone = pytz.utc + ) + self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) + self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + self._helper_methods = EmulatedCollectorHelper() + + self.logger = logging.getLogger(__name__) + self.connected = False # To track connection state + self.logger.info("EmulatedCollector initialized") + + def Connect(self) -> bool: + self.logger.info(f"Connecting to {self.address}:{self.port}") + self.connected = True + self._scheduler.start() + self.logger.info(f"Successfully connected to {self.address}:{self.port}") + return True + + def Disconnect(self) -> bool: + self.logger.info(f"Disconnecting from {self.address}:{self.port}") + if not self.connected: + self.logger.warning("Collector is not connected. Nothing to disconnect.") + return False + self._scheduler.shutdown() + self.connected = False + self.logger.info(f"Successfully disconnected from {self.address}:{self.port}") + return True + + def _require_connection(self): + if not self.connected: + raise RuntimeError("Collector is not connected. Please connect before performing operations.") + + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + for resource_key, duration, interval in subscriptions: + resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name + self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") + try: + self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration + self.logger.info(f"Resource key {resource_key} exists in the configuration.") + resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) + if resource_value is not None: + sample_type_ids = resource_value['sample_types'] + self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") + if len(sample_type_ids) == 0: + self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") + results.append(False) + continue + else: + self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") + results.append(False) + continue + # Add the job to the scheduler + job_id = f"{resource_key}-{uuid.uuid4()}" + self._scheduler.add_job( + self._generate_sample, + 'interval', + seconds=interval, + args=[resource_key, sample_type_ids], + id=job_id, + replace_existing=True, + end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) + ) + self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") + results.append(True) + except Exception as e: + self.logger.error(f"Failed to verify resource key or add job: {e}") + results.append(e) + return results + + def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + self._require_connection() + results = [] + for resource_key, _, _ in subscriptions: + resource_key = self._helper_methods.validate_resource_key(resource_key) + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") + results.append(False) + continue + # Remove jobs + for job_id in job_ids: + self._scheduler.remove_job(job_id) + + self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + results.append(True) + except Exception as e: + self.logger.exception(f"Failed to unsubscribe from {resource_key}") + results.append(e) + return results + + def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: + self._require_connection() + start_time = datetime.now(pytz.utc) + duration = 10 # Duration of the subscription in seconds (as an example) + + while True: + try: + if terminate and not terminate.empty(): + self.logger.info("Termination signal received, stopping GetState") + break + + elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds() + if elapsed_time >= duration: + self.logger.info("Duration expired, stopping GetState") + break + + sample = self._out_samples.get(block=blocking, timeout=1 if blocking else 0.1) + self.logger.info(f"Retrieved state sample: {sample}") + yield sample + except queue.Empty: + if not blocking: + self.logger.info("No more samples in queue, exiting GetState") + return None + + def _generate_sample(self, resource_key: str, sample_type_ids : List[int]): + # Simulate generating a sample for the resource key + self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") + sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) + self._out_samples.put(sample) + +# ------------- Event Listeners (START)----------------- + + def _listener_job_removed_from_subscription_tree(self, event): + if event.job_id: + # Extract the resource key from the job ID + resource_key = event.job_id.split('-')[0] + resource_key = self._helper_methods.validate_resource_key(resource_key) + + # Remove the subscription from the tree + try: + subscription_path = resource_key.split('/') + parent = self._subscriptions + for part in subscription_path: + parent = next((child for child in parent.children if child.name == part), None) + if not parent: + raise ValueError(f"Subscription path '{resource_key}' not found in tree.") + if parent: + parent.parent.children = tuple(child for child in parent.parent.children if child != parent) + self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") + except Exception as e: + self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") + + def _listener_job_added_to_subscription_tree(self, event): + try: + job_id = event.job_id + if job_id: + resource_key = job_id.split('-')[0] # Extract resource key from job ID + resource_key = self._helper_methods.validate_resource_key(resource_key) + subscription_path = resource_key.split('/') + parent = self._subscriptions + for part in subscription_path: + node = next((child for child in parent.children if child.name == part), None) + if not node: + node = Node(part, parent=parent) + parent = node + parent.value = { + "job_id": job_id + } + self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") + except Exception as e: + self.logger.exception("Failed to add subscription to the tree") + +# ------------- Event Listeners (END)----------------- + +#------------------------------------------------------------------------------------- +# ------- The below methods are kept for debugging purposes (test-case) only --------- +#------------------------------------------------------------------------------------- + +# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()). + def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. + self._require_connection() + results = [] + + # if not isinstance(resources, dict): + # self.logger.error("Invalid configuration format: resources must be a dictionary.") + # raise ValueError("Invalid configuration format. Must be a dictionary.") + if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): + self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") + raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") + + for rule in resources['config_rules']: + try: + if 'action' not in rule or 'custom' not in rule: + raise ValueError(f"Invalid rule format: {rule}") + + action = rule['action'] + custom = rule['custom'] + resource_key = custom.get('resource_key') + resource_value = custom.get('resource_value') + + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + if not resource_key: + raise ValueError(f"Resource key is missing in rule: {rule}") + + if action == 1: # Set action + resource_path = self._helper_methods._parse_resource_key(resource_key) + # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") + parent = self._running_config + + for part in resource_path[:-1]: + if '[' in part and ']' in part: + base, index = part.split('[', 1) + index = index.rstrip(']') + parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) + # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") + elif resource_path[-1] != 'settings': + # self.logger.info(f"2b. Creating node: {part}") + parent = self._helper_methods._find_or_create_node(part, parent) + + final_part = resource_path[-1] + if final_part in ['address', 'port']: + self._helper_methods._create_or_update_node(final_part, parent, resource_value) + self.logger.info(f"Configured: {resource_key} = {resource_value}") + + if resource_key.startswith("_connect/settings"): + parent = self._helper_methods._find_or_create_node("_connect", self._running_config) + settings_node = self._helper_methods._find_or_create_node("settings", parent) + settings_node.value = None # Ensure settings node has None value + endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) + + for endpoint in resource_value.get("endpoints", []): + uuid = endpoint.get("uuid") + uuid = uuid.replace('/', '_') if uuid else None + if uuid: + # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") + self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) + self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") + + elif resource_key.startswith("/interface"): + interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) + name = resource_value.get("name") + name = name.replace('/', '_') if name else None + if name: + self._helper_methods._create_or_update_node(name, interface_parent, resource_value) + self.logger.info(f"Configured interface: {name} : {resource_value}") + # self.logger.info(f"4. Configured interface: {name}") + + results.append(True) + else: + raise ValueError(f"Unsupported action '{action}' in rule: {rule}") + + if resource_value is None: + raise ValueError(f"Resource value is None for key: {resource_key}") + + except Exception as e: + self.logger.exception(f"Failed to apply rule: {rule}") + results.append(e) + + return results + +#----------------------------------- +# ------- EXTRA Methods ------------ +#----------------------------------- + + # def log_active_jobs(self): # For debugging purposes. + # """ + # Logs the IDs of all active jobs. + # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. + # """ + # self._require_connection() + # jobs = self._scheduler.get_jobs() + # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") + + # def print_config_tree(self): # For debugging purposes. + # """ + # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. + # """ + # self._require_connection() + + # def print_tree(node, indent=""): + # """ + # Recursively prints the configuration tree. + + # Args: + # node (Node): The current node to print. + # indent (str): The current indentation level. + # """ + # if node.name != "root": # Skip the root node's name + # value = getattr(node, "value", None) + # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") + + # for child in node.children: + # print_tree(child, indent + " ") + + # print("Configuration Tree:") + # print_tree(self._running_config) + + + # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment + # self._require_connection() + # results = [] + # for node in self._initial_config.descendants: + # value = getattr(node, "value", None) + # results.append((node.name, json.loads(value) if value else None)) + # self.logger.info("Retrieved initial configurations") + # return results + + # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment + # """ + # Retrieves the configuration for the specified resource keys. + # If no keys are provided, returns the full configuration tree. + + # Args: + # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. + + # Returns: + # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, + # subtree, or an exception. + # """ + # self._require_connection() + # results = [] + + # try: + # if not resource_keys: + # # If no specific keys are provided, return the full configuration tree + + # full_tree = self._helper_methods._generate_subtree(self._running_config) + # # full_tree = self._generate_subtree(self._running_config) + # return [("full_configuration", full_tree)] + + # for key in resource_keys: + # try: + # # Parse the resource key + # resource_path = self._helper_methods.(key) + # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") + + # # Navigate to the node corresponding to the key + # parent = self._running_config + # for part in resource_path: + # parent = self._find_or_raise_node(part, parent) + + # # Check if the node has a value + # value = getattr(parent, "value", None) + # if value: + # # If a value exists, return it + # results.append((key, json.loads(value))) + # else: + # # If no value, return the subtree of this node + # subtree = self._helper_methods._generate_subtree(parent) + # # subtree = self._generate_subtree(parent) + # results.append((key, subtree)) + + # except Exception as e: + # self.logger.exception(f"Failed to retrieve configuration for key: {key}") + # results.append((key, e)) + + # except Exception as e: + # self.logger.exception("Failed to retrieve configurations") + # results.append(("Error", e)) + + # return results + + # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment + # self._require_connection() + # results = [] + + # for key in resources: + # try: + # # Parse resource key into parts, handling brackets correctly + # resource_path = self._helper_methods.(key) + + # parent = self._running_config + # for part in resource_path: + # parent = self._find_or_raise_node(part, parent) + + # # Delete the final node + # node_to_delete = parent + # parent = node_to_delete.parent + # parent.children = tuple(child for child in parent.children if child != node_to_delete) + # self.logger.info(f"Deleted configuration for key: {key}") + + # # Handle endpoints structure + # if "interface" in key and "settings" in key: + # interface_name = key.split('[')[-1].split(']')[0] + # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) + # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) + # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) + # if endpoint_to_delete: + # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) + # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") + + # # Check if parent has no more children and is not the root + # while parent and parent.name != "root" and not parent.children: + # node_to_delete = parent + # parent = node_to_delete.parent + # parent.children = tuple(child for child in parent.children if child != node_to_delete) + # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") + + # results.append(True) + # except Exception as e: + # self.logger.exception(f"Failed to delete configuration for key: {key}") + # results.append(e) + + # return results + diff --git a/src/telemetry/backend/collectors/emulated/EmulatedHelper.py b/src/telemetry/backend/collectors/emulated/EmulatedHelper.py new file mode 100644 index 0000000000000000000000000000000000000000..ebfb7d49fdb7ceafa00986e763e56dd59e445609 --- /dev/null +++ b/src/telemetry/backend/collectors/emulated/EmulatedHelper.py @@ -0,0 +1,166 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from anytree import Node +import json +from typing import Any, List + + +class EmulatedCollectorHelper: + """ + Helper class for the emulated collector. + """ + def __init__(self): + pass + + def validate_resource_key(self, key: str) -> str: + """ + Splits the input string into two parts: + - The first part is '_connect/settings/endpoints/'. + - The second part is the remaining string after the first part, with '/' replaced by '_'. + + Args: + key (str): The input string to process. + + Returns: + str: A single string with the processed result. + """ + prefix = '_connect/settings/endpoints/' + if not key.startswith(prefix): + raise ValueError(f"The input path '{key}' does not start with the expected prefix: {prefix}") + second_part = key[len(prefix):] + second_part_processed = second_part.replace('/', '_') + validated_key = prefix + second_part_processed + return validated_key + +#-------------------------------------------------------------------------------------- +# ------- Below function is kept for debugging purposes (test-cases) only ------------- +#-------------------------------------------------------------------------------------- + +# This below methods can be commented but are called by the SetConfig method in EmulatedCollector.py + + def _find_or_create_node(self, name: str, parent: Node) -> Node: + """ + Finds or creates a node with the given name under the specified parent. + + Args: + name (str): The name of the node to find or create. + parent (Node): The parent node. + + Returns: + Node: The found or created node. + """ + node = next((child for child in parent.children if child.name == name), None) + if not node: + node = Node(name, parent=parent) + return node + + + def _create_or_update_node(self, name: str, parent: Node, value: Any): + """ + Creates or updates a node with the given name and value under the specified parent. + + Args: + name (str): The name of the node. + parent (Node): The parent node. + value (Any): The value to set on the node. + """ + node = next((child for child in parent.children if child.name == name), None) + if node: + node.value = json.dumps(value) + else: + Node(name, parent=parent, value=json.dumps(value)) + + + def _parse_resource_key(self, resource_key: str) -> List[str]: + """ + Parses the resource key into parts, correctly handling brackets. + + Args: + resource_key (str): The resource key to parse. + + Returns: + List[str]: A list of parts from the resource key. + """ + resource_path = [] + current_part = "" + in_brackets = False + + if not resource_key.startswith('/interface'): + for char in resource_key.strip('/'): + if char == '[': + in_brackets = True + current_part += char + elif char == ']': + in_brackets = False + current_part += char + elif char == '/' and not in_brackets: + resource_path.append(current_part) + current_part = "" + else: + current_part += char + if current_part: + resource_path.append(current_part) + return resource_path + else: + resource_path = resource_key.strip('/').split('/', 1) + if resource_path[1] == 'settings': + return resource_path + else: + resource_path = [resource_key.strip('/').split('[')[0].strip('/'), resource_key.strip('/').split('[')[1].split(']')[0].replace('/', '_')] + return resource_path + + +#----------------------------------- +# ------- EXTRA Methods ------------ +#----------------------------------- + + # def _generate_subtree(self, node: Node) -> dict: + # """ + # Generates a subtree of the configuration tree starting from the specified node. + + # Args: + # node (Node): The node from which to generate the subtree. + + # Returns: + # dict: The subtree as a dictionary. + # """ + # subtree = {} + # for child in node.children: + # if child.children: + # subtree[child.name] = self._generate_subtree(child) + # else: + # value = getattr(child, "value", None) + # subtree[child.name] = json.loads(value) if value else None + # return subtree + + + # def _find_or_raise_node(self, name: str, parent: Node) -> Node: + # """ + # Finds a node with the given name under the specified parent or raises an exception if not found. + + # Args: + # name (str): The name of the node to find. + # parent (Node): The parent node. + + # Returns: + # Node: The found node. + + # Raises: + # ValueError: If the node is not found. + # """ + # node = next((child for child in parent.children if child.name == name), None) + # if not node: + # raise ValueError(f"Node '{name}' not found under parent '{parent.name}'.") + # return node diff --git a/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py new file mode 100644 index 0000000000000000000000000000000000000000..a01e2c0e659f1eea6383030daeafef11c83d7a45 --- /dev/null +++ b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py @@ -0,0 +1,129 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import random +import logging +import queue +import time + +LOGGER = logging.getLogger(__name__) + +class SyntheticMetricsGenerator(): + """ + This collector class generates synthetic network metrics based on the current network state. + The metrics include packet_in, packet_out, bytes_in, bytes_out, packet_loss (percentage), packet_drop_count, byte_drop_count, and latency. + The network state can be 'good', 'moderate', or 'poor', and it affects the generated metrics accordingly. + """ + def __init__(self, metric_queue=None, network_state="good"): + LOGGER.info("Initiaitng Emulator") + super().__init__() + self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() + self.network_state = network_state + self.running = True + self.set_initial_parameter_values() # update this method to set the initial values for the parameters + + def set_initial_parameter_values(self): + self.bytes_per_pkt = random.uniform(65, 150) + self.states = ["good", "moderate", "poor"] + self.state_probabilities = { + "good" : [0.9, 0.1, 0.0], + "moderate": [0.2, 0.7, 0.1], + "poor" : [0.0, 0.3, 0.7] + } + if self.network_state == "good": + self.packet_in = random.uniform(700, 900) + elif self.network_state == "moderate": + self.packet_in = random.uniform(300, 700) + else: + self.packet_in = random.uniform(100, 300) + + def generate_synthetic_data_point(self, resource_key, sample_type_ids): + """ + Generates a synthetic data point based on the current network state. + + Parameters: + resource_key (str): The key associated with the resource for which the data point is generated. + + Returns: + tuple: A tuple containing the timestamp, resource key, and a dictionary of generated metrics. + """ + if self.network_state == "good": + packet_loss = random.uniform(0.01, 0.1) + random_noise = random.uniform(1,10) + latency = random.uniform(5, 25) + elif self.network_state == "moderate": + packet_loss = random.uniform(0.1, 1) + random_noise = random.uniform(10, 40) + latency = random.uniform(25, 100) + elif self.network_state == "poor": + packet_loss = random.uniform(1, 3) + random_noise = random.uniform(40, 100) + latency = random.uniform(100, 300) + else: + raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") + + period = 60 * 60 * random.uniform(10, 100) + amplitude = random.uniform(50, 100) + sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in + packet_in = sin_wave + ((sin_wave/100) * random_noise) + packet_out = packet_in - ((packet_in / 100) * packet_loss) + bytes_in = packet_in * self.bytes_per_pkt + bytes_out = packet_out * self.bytes_per_pkt + packet_drop_count = packet_in * (packet_loss / 100) + byte_drop_count = packet_drop_count * self.bytes_per_pkt + + state_prob = self.state_probabilities[self.network_state] + self.network_state = random.choices(self.states, state_prob)[0] + print (self.network_state) + + generated_samples = { + "packet_in" : int(packet_in), "packet_out" : int(packet_out), "bytes_in" : float(bytes_in), + "bytes_out" : float(bytes_out), "packet_loss": float(packet_loss), "packet_drop_count" : int(packet_drop_count), + "latency" : float(latency), "byte_drop_count": float(byte_drop_count) + } + requested_metrics = self.metric_id_mapper(sample_type_ids, generated_samples) + # generated_samples = {metric: generated_samples[metric] for metric in requested_metrics} + + return (time.time(), resource_key, requested_metrics) + + def metric_id_mapper(self, sample_type_ids, metric_dict): + """ + Maps the sample type IDs to the corresponding metric names. + + Parameters: + sample_type_ids (list): A list of sample type IDs. + + Returns: + list: A list of metric names. + """ + metric_names = [] + for sample_type_id in sample_type_ids: + if sample_type_id == 102: + metric_names.append(metric_dict["packet_in"]) + elif sample_type_id == 101: + metric_names.append(metric_dict["packet_out"]) + elif sample_type_id == 103: + metric_names.append(metric_dict["packet_drop_count"]) + elif sample_type_id == 202: + metric_names.append(metric_dict["bytes_in"]) + elif sample_type_id == 201: + metric_names.append(metric_dict["bytes_out"]) + elif sample_type_id == 203: + metric_names.append(metric_dict["byte_drop_count"]) + elif sample_type_id == 701: + metric_names.append(metric_dict["latency"]) + else: + raise ValueError(f"Invalid sample type ID: {sample_type_id}") + return metric_names diff --git a/src/telemetry/backend/collectors/emulated/__init__.py b/src/telemetry/backend/collectors/emulated/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..023830645e0fcb60e3f8583674a954810af222f2 --- /dev/null +++ b/src/telemetry/backend/collectors/emulated/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/requirements.in b/src/telemetry/backend/requirements.in index effd1752af0d1a2d00312ff4935676c24964c784..2843bdbf68defcc1a972b49bfa12a8107b696aaa 100644 --- a/src/telemetry/backend/requirements.in +++ b/src/telemetry/backend/requirements.in @@ -12,4 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +anytree==2.8.0 confluent-kafka==2.3.* +numpy==2.0.1 +APScheduler==3.10.1 diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index fe5792a023a926d41a71b0dc7614d7e2b093507b..a1f17df3cb65a6bd13ffb8e96a6a07b536200825 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,21 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import queue import json import time -import random import logging import threading -from typing import Any, Dict -from datetime import datetime, timezone -# from common.proto.context_pb2 import Empty -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaError +from typing import Any, Dict +from datetime import datetime, timezone +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum -from common.Settings import get_service_port_grpc -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from common.method_wrappers.Decorator import MetricsPool +from common.Settings import get_service_port_grpc +from common.method_wrappers.Decorator import MetricsPool +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService LOGGER = logging.getLogger(__name__) @@ -35,7 +34,7 @@ METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ Class listens for request on Kafka topic, fetches requested metrics from device. - Produces metrics on both RESPONSE and VALUE kafka topics. + Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') @@ -45,7 +44,9 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.running_threads = {} + self.running_threads = {} + self.emulatorCollector = None + self.metric_queue = queue.Queue() def install_servicers(self): threading.Thread(target=self.RequestListener).start() @@ -57,7 +58,7 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.info('Telemetry backend request listener is running ...') # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer - consumer.subscribe([KafkaTopic.REQUEST.value]) + consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) if receive_msg is None: @@ -66,93 +67,83 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - # print("Consumer error: {}".format(receive_msg.error())) + LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - # print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) else: - self.RunInitiateCollectorBackend(collector_id, collector) + threading.Thread(target=self.InitiateCollectorBackend, + args=(collector_id, collector)).start() except Exception as e: - LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) - # print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def TerminateCollectorBackend(self, collector_id): - if collector_id in self.running_threads: - thread, stop_event = self.running_threads[collector_id] - stop_event.set() - thread.join() - # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) - del self.running_threads[collector_id] - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - else: - # print ('Backend collector {:} not found'.format(collector_id)) - LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - - def RunInitiateCollectorBackend(self, collector_id: str, collector: str): - stop_event = threading.Event() - thread = threading.Thread(target=self.InitiateCollectorBackend, - args=(collector_id, collector, stop_event)) - self.running_threads[collector_id] = (thread, stop_event) - thread.start() - - def InitiateCollectorBackend(self, collector_id, collector, stop_event): + def InitiateCollectorBackend(self, collector_id, collector): """ Method receives collector request and initiates collecter backend. """ - # print("Initiating backend for collector: ", collector_id) - LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id))) - start_time = time.time() - while not stop_event.is_set(): - if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend - print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - break - self.ExtractKpiValue(collector_id, collector['kpi_id']) - time.sleep(collector['interval']) + LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id))) + # start_time = time.time() + # self.emulatorCollector = NetworkMetricsEmulator( + # duration = collector['duration'], + # interval = collector['interval'], + # metric_queue = self.metric_queue + # ) + # self.emulatorCollector.start() + # self.running_threads[collector_id] = self.emulatorCollector - def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + # while self.emulatorCollector.is_alive(): + # if not self.metric_queue.empty(): + # metric_value = self.metric_queue.get() + # LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) + # self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) + # time.sleep(1) + # self.TerminateCollectorBackend(collector_id) + + def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi Termination signat on RESPONSE Kafka topic + Method to write kpi value on VALUE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value, + "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.RESPONSE.value, # TODO: to the topic ... + KafkaTopic.VALUE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() - def ExtractKpiValue(self, collector_id: str, kpi_id: str): - """ - Method to extract kpi value. - """ - measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device - # print ("Measured Kpi value: {:}".format(measured_kpi_value)) - self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) + def TerminateCollectorBackend(self, collector_id): + LOGGER.debug("Terminating collector backend...") + if collector_id in self.running_threads: + thread = self.running_threads[collector_id] + thread.stop() + del self.running_threads[collector_id] + LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. + else: + LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi value on RESPONSE Kafka topic + Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { - "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value + "kpi_value" : measured_kpi_value, } producer.produce( - KafkaTopic.VALUE.value, # TODO: to the topic ... + KafkaTopic.TELEMETRY_RESPONSE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback @@ -160,14 +151,9 @@ class TelemetryBackendService(GenericGrpcService): producer.flush() def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - LOGGER.error('Message delivery failed: {:}'.format(err)) + if err: + LOGGER.error('Message delivery failed: {:s}'.format(str(err))) # print(f'Message delivery failed: {err}') - #else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # # print(f'Message delivered to topic {msg.topic()}') + # else: + # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) + # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/messages.py b/src/telemetry/backend/tests/messages.py index 0dc5506ab42d67d67a88cf8976409472213fe098..f6a2bb247f28d10654746e0c75b6ed1973382e38 100644 --- a/src/telemetry/backend/tests/messages.py +++ b/src/telemetry/backend/tests/messages.py @@ -12,4 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import random +from common.proto import telemetry_frontend_pb2 +# from common.proto.kpi_sample_types_pb2 import KpiSampleType +# from common.proto.kpi_manager_pb2 import KpiId + +def create_collector_request(): + _create_collector_request = telemetry_frontend_pb2.Collector() + _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) + # _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" + _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_collector_request.duration_s = float(random.randint(8, 16)) + # _create_collector_request.duration_s = -1 + _create_collector_request.interval_s = float(random.randint(2, 4)) + return _create_collector_request diff --git a/src/telemetry/backend/tests/messages_emulated.py b/src/telemetry/backend/tests/messages_emulated.py new file mode 100644 index 0000000000000000000000000000000000000000..e081fb3febc163d40742d3e690cc0e2b3d4b7d61 --- /dev/null +++ b/src/telemetry/backend/tests/messages_emulated.py @@ -0,0 +1,61 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +# Configure logging to ensure logs appear on the console +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +def create_test_configuration(): + return { + "config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": 8080}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": { + "endpoints": [ + {"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]}, + {"uuid": "eth1", "type": "ethernet", "sample_types": []}, + {"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]} + ] + }}}, + {"action": 1, "custom": {"resource_key": "/interface[eth0]/settings", "resource_value": { + "name": "eth0", "enabled": True + }}}, + {"action": 1, "custom": {"resource_key": "/interface[eth1]/settings", "resource_value": { + "name": "eth1", "enabled": False + }}}, + {"action": 1, "custom": {"resource_key": "/interface[13/1/2]/settings", "resource_value": { + "name": "13/1/2", "enabled": True + }}} + ] + } + +def create_specific_config_keys(): + keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"] + return keys_to_return + +def create_config_for_delete(): + keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"] + return keys_to_delete + +def create_test_subscriptions(): + return [("_connect/settings/endpoints/eth1", 10, 2), + ("_connect/settings/endpoints/13/1/2", 15, 3), + ("_connect/settings/endpoints/eth0", 8, 2)] + +def create_unscubscribe_subscriptions(): + return [("_connect/settings/endpoints/eth1", 10, 2), + ("_connect/settings/endpoints/13/1/2", 15, 3), + ("_connect/settings/endpoints/eth0", 8, 2)] diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 5307cd9fe51f3bb15f5dd4915bfc601318db9551..e75b33ca58c6bf27c5d2e1c2012dc31de5274ad3 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -13,10 +13,11 @@ # limitations under the License. import logging -import threading +import time +from typing import Dict from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService - +from .messages import create_collector_request LOGGER = logging.getLogger(__name__) @@ -34,4 +35,19 @@ def test_validate_kafka_topics(): # def test_RunRequestListener(): # LOGGER.info('test_RunRequestListener') # TelemetryBackendServiceObj = TelemetryBackendService() -# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file +# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() + +def test_RunInitiateCollectorBackend(): + LOGGER.debug(">>> RunInitiateCollectorBackend <<<") + collector_obj = create_collector_request() + collector_id = collector_obj.collector_id.collector_id.uuid + collector_dict : Dict = { + "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, + "duration": collector_obj.duration_s, + "interval": collector_obj.interval_s + } + TeleObj = TelemetryBackendService() + TeleObj.InitiateCollectorBackend(collector_id, collector_dict) + time.sleep(20) + + LOGGER.debug("--- Execution Finished Sucessfully---") diff --git a/src/telemetry/backend/tests/test_emulated.py b/src/telemetry/backend/tests/test_emulated.py new file mode 100644 index 0000000000000000000000000000000000000000..feb5b1f7f92de4016f3bcb8eff8e17b843bf0c3e --- /dev/null +++ b/src/telemetry/backend/tests/test_emulated.py @@ -0,0 +1,108 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import pytest +from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector +from telemetry.backend.tests.messages_emulated import ( + create_test_configuration, + create_specific_config_keys, + create_config_for_delete, + create_test_subscriptions, +) + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +@pytest.fixture +def setup_collector(): + """Sets up an EmulatedCollector instance for testing.""" + yield EmulatedCollector(address="127.0.0.1", port=8080) + +@pytest.fixture +def connected_configured_collector(setup_collector): + collector = setup_collector # EmulatedCollector(address="127.0.0.1", port=8080) + collector.Connect() + collector.SetConfig(create_test_configuration()) + yield collector + collector.Disconnect() + +def test_connect(setup_collector): + logger.info(">>> test_connect <<<") + collector = setup_collector + assert collector.Connect() is True + assert collector.connected is True + +def test_disconnect(setup_collector): + logger.info(">>> test_disconnect <<<") + collector = setup_collector + collector.Connect() + assert collector.Disconnect() is True + assert collector.connected is False + +# def test_set_config(setup_collector): +# logger.info(">>> test_set_config <<<") +# collector = setup_collector +# collector.Connect() + +# config = create_test_configuration() + +# results = collector.SetConfig(config) +# assert all(result is True for result in results) + +# def test_get_config(connected_configured_collector): +# logger.info(">>> test_get_config <<<") +# resource_keys = create_specific_config_keys() +# results = connected_configured_collector.GetConfig(resource_keys) + +# for key, value in results: +# assert key in create_specific_config_keys() +# assert value is not None + +# def test_delete_config(connected_configured_collector): +# logger.info(">>> test_delete_config <<<") +# resource_keys = create_config_for_delete() + +# results = connected_configured_collector.DeleteConfig(resource_keys) +# assert all(result is True for result in results) + +def test_subscribe_state(connected_configured_collector): + logger.info(">>> test_subscribe_state <<<") + subscriptions = create_test_subscriptions() + + results = connected_configured_collector.SubscribeState(subscriptions) + # logger.info(f"Subscribed result: {results}.") + assert results == [False, True, True] # all(result is True for result in results) + +def test_unsubscribe_state(connected_configured_collector): + logger.info(">>> test_unsubscribe_state <<<") + subscriptions = create_test_subscriptions() + + connected_configured_collector.SubscribeState(subscriptions) + results = connected_configured_collector.UnsubscribeState(subscriptions) + assert results == [False, True, True] # all(result is True for result in results) + +def test_get_state(connected_configured_collector): + logger.info(">>> test_get_state <<<") + subscriptions = create_test_subscriptions() + + connected_configured_collector.SubscribeState(subscriptions) + logger.info(f"Subscribed to state: {subscriptions}. waiting for 12 seconds ...") + time.sleep(12) + + state_iterator = connected_configured_collector.GetState(blocking=False) + states = list(state_iterator) + + assert len(states) > 0 diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index ce55f80510b3ff678ead1be82bda9101cc8e7e17..f74e97ffd4998ca0b3255ca4e1ebe496ebc6737b 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -74,7 +74,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): "interval": collector_obj.interval_s } self.kafka_producer.produce( - KafkaTopic.REQUEST.value, + KafkaTopic.TELEMETRY_REQUEST.value, key = collector_uuid, value = json.dumps(collector_to_generate), callback = self.delivery_callback @@ -110,7 +110,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): "interval": -1 } self.kafka_producer.produce( - KafkaTopic.REQUEST.value, + KafkaTopic.TELEMETRY_REQUEST.value, key = collector_uuid, value = json.dumps(collector_to_stop), callback = self.delivery_callback @@ -168,7 +168,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ listener for response on Kafka topic. """ - self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value]) + self.kafka_consumer.subscribe([KafkaTopic.TELEMETRY_RESPONSE.value]) while True: receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: @@ -196,7 +196,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: # print ("Backend termination confirmation for collector id: ", collector_id) - LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) else: - LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)