diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 99a789c9fda481942b904a2e805202642123835c..722779824a8c91b5d7cff0337c10c4ea1327a1b6 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index e30d30da623b2d0eee3d925d69a846b4b1f516a3..e74eb4ec198d77688f62004931c69eac31e60f0c 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh index 3953d2a89c6fbe2bd3546e648246b9b018e5fdb0..96ac558bad5f0bf6bc6f5ee90a26cd11fda69273 100755 --- a/scripts/run_tests_locally-kpi-value-API.sh +++ b/scripts/run_tests_locally-kpi-value-API.sh @@ -19,8 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc -KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") -KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") +# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +export KFK_SERVER_ADDRESS='127.0.0.1:9092' # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index ef49657cdc270153b5de416cf46bef35bf2c04e6..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; -# Install Java (required for PySpark) -RUN apt-get update && \ - apt-get install -y default-jdk && \ - apt-get clean - -# Set JAVA_HOME environment variable -ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 -ENV PATH=$JAVA_HOME/bin:$PATH - # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 9df678fe819f33d479b8f5090ca9ac4eb1f4047c..360d94f4668b19feba305df76a65ef70b26e091f 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,5 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -pyspark==3.5.2 +dask==2024.1.0 +distributed==2024.1.0 +pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 96b1f52428a2e612f7869ca90e878f4774cf6b8a..f9c3b86a2eafe8a82691fa992027b026a1e9aa18 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,18 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import time import json import logging import threading from common.tools.service.GenericGrpcService import GenericGrpcService -from analytics.backend.service.SparkStreaming import SparkStreamer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc - +from threading import Thread, Event +from .DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def StartSparkStreamer(self, analyzer_uuid, analyzer): - kpi_list = analyzer['input_kpis'] - oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line... - thresholds = analyzer['thresholds'] - window_size = analyzer['window_size'] - window_slider = analyzer['window_slider'] - # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - try: - stop_event = threading.Event() - thread = threading.Thread(target=SparkStreamer, - args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, - window_size, window_slider, None )) - self.running_threads[analyzer_uuid] = (thread, stop_event) - thread.start() - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - return True - except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) - return False - - def StopRequestListener(self, threadInfo: tuple): - try: - thread, stop_event = threadInfo - stop_event.set() - thread.join() - print ("Terminating Analytics backend RequestListener") - LOGGER.info("Terminating Analytics backend RequestListener") - return True - except Exception as e: - print ("Failed to terminate analytics backend {:}".format(e)) - LOGGER.error("Failed to terminate analytics backend {:}".format(e)) - return False - def install_servicers(self): threading.Thread(target=self.RequestListener, args=()).start() @@ -97,7 +59,8 @@ class AnalyticsBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) + print ("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) @@ -106,14 +69,44 @@ class AnalyticsBackendService(GenericGrpcService): print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) + self.StopDaskListener(analyzer_uuid) else: - self.StartSparkStreamer(analyzer_uuid, analyzer) + self.StartDaskListener(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def TerminateAnalyzerBackend(self, analyzer_uuid): + def StartDaskListener(self, analyzer_uuid, analyzer): + kpi_list = analyzer[ 'input_kpis' ] + thresholds = analyzer[ 'thresholds' ] + window_size = analyzer[ 'window_size' ] + window_slider = analyzer[ 'window_slider'] + + LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + print ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + try: + stop_event = Event() + thread = Thread( + target=DaskStreamer, + # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), + args=(analyzer_uuid, kpi_list, thresholds, stop_event), + kwargs={ + "window_size" : window_size, + } + ) + thread.start() + self.running_threads[analyzer_uuid] = (thread, stop_event) + print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False + + def StopDaskListener(self, analyzer_uuid): if analyzer_uuid in self.running_threads: try: thread, stop_event = self.running_threads[analyzer_uuid] @@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService): return False else: print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) - # generate confirmation towards frontend + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py new file mode 100644 index 0000000000000000000000000000000000000000..f09da9949fd0f745f80f782273024f9175db820c --- /dev/null +++ b/src/analytics/backend/service/DaskStreaming.py @@ -0,0 +1,233 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +import pandas as pd +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +def SettingKafkaConsumerParams(): + return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest'} + +def GetAggregationMappings(thresholds): + agg_dict = {} + for threshold_key in thresholds.keys(): + parts = threshold_key.split('_', 1) + if len(parts) != 2: + LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.") + continue + aggregation, metric_name = parts + # Ensure that the aggregation function is valid in pandas + if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: + LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") + continue + agg_dict[threshold_key] = ('kpi_value', aggregation) + return agg_dict + +def ApplyThresholds(aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'. + Returns: pd.DataFrame: DataFrame with additional threshold columns. + """ + for threshold_key, threshold_values in thresholds.items(): + if threshold_key not in aggregated_df.columns: + LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") + continue + if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th + aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + else: + LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") + return aggregated_df + +def initialize_dask_client(): + """ + Initialize a local Dask cluster and client. + """ + cluster = LocalCluster(n_workers=2, threads_per_worker=2) + client = Client(cluster) + LOGGER.info(f"Dask Client Initialized: {client}") + return client, cluster + +def initialize_kafka_producer(): + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + +def delivery_report(err, msg): + if err is not None: + LOGGER.error(f"Message delivery failed: {err}") + else: + LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + +def process_batch(batch, agg_mappings, thresholds): + """ + Process a batch of data and apply thresholds. + Args: batch (list of dict): List of messages from Kafka. + agg_mappings (dict): Mapping from threshold key to aggregation function. + thresholds (dict): Thresholds dictionary. + Returns: list of dict: Processed records ready to be sent to Kafka. + """ + if not batch: + LOGGER.info("Empty batch received. Skipping processing.") + return [] + + df = pd.DataFrame(batch) + df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') + df.dropna(subset=['time_stamp'], inplace=True) + required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} + if not required_columns.issubset(df.columns): + LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") + return [] + if df.empty: + LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") + return [] + + # Perform aggregations using named aggregation + try: + agg_dict = {key: value for key, value in agg_mappings.items()} + df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index() + except Exception as e: + LOGGER.error(f"Aggregation error: {e}") + return [] + + # Apply thresholds + df_thresholded = ApplyThresholds(df_agg, thresholds) + df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') + # Convert aggregated DataFrame to list of dicts + result = df_thresholded.to_dict(orient='records') + LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") + + return result + +def produce_result(result, producer, destination_topic): + for record in result: + try: + producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=delivery_report + ) + except KafkaException as e: + LOGGER.error(f"Failed to produce message: {e}") + producer.flush() + LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + +def DaskStreamer(key, kpi_list, thresholds, stop_event, + window_size="30s", time_stamp_col="time_stamp"): + client, cluster = initialize_dask_client() + consumer_conf = SettingKafkaConsumerParams() + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + producer = initialize_kafka_producer() + + # Parse window_size to seconds + try: + window_size_td = pd.to_timedelta(window_size) + window_size_seconds = window_size_td.total_seconds() + except Exception as e: + LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") + window_size_seconds = 30 + LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") + + # Extract aggregation mappings from thresholds + agg_mappings = GetAggregationMappings(thresholds) + if not agg_mappings: + LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") + consumer.close() + producer.flush() + client.close() + cluster.close() + return + try: + batch = [] + last_batch_time = time.time() + LOGGER.info("Starting to consume messages...") + + while not stop_event.is_set(): + msg = consumer.poll(1.0) + + if msg is None: + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + else: + LOGGER.error(f"Kafka error: {msg.error()}") + continue + + try: + message_value = json.loads(msg.value().decode('utf-8')) + except json.JSONDecodeError as e: + LOGGER.error(f"JSON decode error: {e}") + continue + + try: + message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') + if pd.isna(message_timestamp): + LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") + continue + window_start = message_timestamp.floor(window_size) + message_value['window_start'] = window_start + except Exception as e: + LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") + continue + + if message_value['kpi_id'] not in kpi_list: + LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") + continue + + batch.append(message_value) + + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + + except Exception as e: + LOGGER.exception(f"Error in Dask streaming process: {e}") + finally: + # Process any remaining messages in the batch + if batch: + LOGGER.info("Processing remaining messages in the batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + consumer.close() + producer.flush() + LOGGER.info("Kafka consumer and producer closed.") + client.close() + cluster.close() + LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index e5faaa1f5903d97d5201b46119eff931f420696b..cdc6c34428a72e5fcf90db3c5656a33c2bb29008 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -16,7 +16,7 @@ import uuid import json from common.proto.kpi_manager_pb2 import KpiId from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, - Analyzer ) + Analyzer, AnalyzerId ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -38,6 +38,13 @@ def get_threshold_dict(): op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } +def create_analyzer_id(): + _create_analyzer_id = AnalyzerId() + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + return _create_analyzer_id + def create_analyzer(): _create_analyzer = Analyzer() @@ -70,4 +77,38 @@ def create_analyzer(): _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet - return _create_analyzer \ No newline at end of file + return _create_analyzer + +def create_analyzer_dask(): + _create_analyzer = Analyzer() + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + + _threshold_dict = { + 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + return _create_analyzer diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index c2e7fbe3100e9904b1128ac5349566c3bb7c8f51..470729160c75fd7491e58191f534db9f4da61806 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -19,7 +19,9 @@ import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer +from .messages import create_analyzer, create_analyzer_dask +from threading import Thread, Event +from ..service.DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -29,35 +31,108 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + + +# --- To test Dask Streamer functionality --- +# def test_StartDaskStreamer(): # Directly from the Streamer class +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# stop_event = Event() +# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] +# oper_list = ['avg', 'min', 'max',] +# thresholds = { +# 'avg_value': (10.0, 90.0), +# 'min_value': (5.0, 95.0), +# 'max_value': (15.0, 85.0), +# 'latency' : (2.0, 10.0) +# } + +# # Start the DaskStreamer in a separate thread +# streamer_thread = Thread( +# target=DaskStreamer, +# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), +# kwargs={ +# "window_size": "60s", +# "win_slide_duration": "30s", +# "time_stamp_col": "time_stamp" +# } +# ) +# streamer_thread.start() +# try: +# while True: +# time.sleep(10) +# except KeyboardInterrupt: +# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") +# stop_event.set() +# streamer_thread.join() +# LOGGER.info("Streamer stopped gracefully.") # --- To test Start Streamer functionality --- -def test_StartSparkStreamer(): - LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") - analyzer_obj = create_analyzer() - analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid - analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] - } - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) - assert isinstance(response, bool) +# def test_StartDaskStreamer(): +# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") +# analyzer_obj = create_analyzer_dask() +# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), +# # "oper_list" : analyzer_obj.parameters["oper_list"], +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) +# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) +# time.sleep(100) +# LOGGER.info('Initiating StopRequestListener...') +# # AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) -# --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') +# --- To test Start Streamer functionality --- +# def test_StartSparkStreamer(): +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# analyzer_obj = create_analyzer() +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } # AnalyticsBackendServiceObj = AnalyticsBackendService() -# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start() +# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) + +# --- To TEST StartRequestListenerFunctionality +def test_StartRequestListener(): + LOGGER.info('test_RunRequestListener') + AnalyticsBackendServiceObj = AnalyticsBackendService() + AnalyticsBackendServiceObj.stop_event = Event() + listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) + listener_thread.start() + + time.sleep(100) + + # AnalyticsBackendServiceObj.stop_event.set() + # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') + # listener_thread.join(timeout=10) + # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." + LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index eb25c33b03744d627efae0a436e5bdce4553b4af..e2d39585e434b58c0d48d0061e105a5ebaabe6b9 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -49,11 +49,12 @@ def create_analyzer(): _create_analyzer.output_kpi_ids.append(_kpi_id) # parameter _threshold_dict = { - # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) + } _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet return _create_analyzer diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 0e2d49300ccf1cffed7f09435136055b8c70615e..706e180d5cf65107f5899315cdf75867beb81608 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', + 'group.id' : 'kpi-value-api-frontend', 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -152,9 +152,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): response.start_timestamp.timestamp = datetime.strptime( - value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.end_timestamp.timestamp = datetime.strptime( - value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] for key, threshold in value.items(): if key not in ['kpi_id', 'window']: @@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): key, value = self.result_queue.get() # Wait until a result is available LOGGER.info("In while true ...") yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") + except Exception as e: + LOGGER.warning("Listener stopped. Error: {:}".format(e)) finally: - self.StopListener() + self.scheduler.shutdown() def response_listener(self, filter_key=None): """ @@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): consumer = self.kafka_consumer consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + break + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err))