diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 9df678fe819f33d479b8f5090ca9ac4eb1f4047c..04ab95c2df65c09e67ce7386ab2277da884b39f3 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,5 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -pyspark==3.5.2 +dask==2024.9.0 +distributed==2024.9.0 +pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 96b1f52428a2e612f7869ca90e878f4774cf6b8a..367d91d97f5c3676d0e17dbfaa80f40abc09e9c1 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,18 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import time import json import logging import threading from common.tools.service.GenericGrpcService import GenericGrpcService -from analytics.backend.service.SparkStreaming import SparkStreamer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc - +from threading import Thread, Event +from .DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def StartSparkStreamer(self, analyzer_uuid, analyzer): - kpi_list = analyzer['input_kpis'] - oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line... - thresholds = analyzer['thresholds'] - window_size = analyzer['window_size'] - window_slider = analyzer['window_slider'] - # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - try: - stop_event = threading.Event() - thread = threading.Thread(target=SparkStreamer, - args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, - window_size, window_slider, None )) - self.running_threads[analyzer_uuid] = (thread, stop_event) - thread.start() - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - return True - except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) - return False - - def StopRequestListener(self, threadInfo: tuple): - try: - thread, stop_event = threadInfo - stop_event.set() - thread.join() - print ("Terminating Analytics backend RequestListener") - LOGGER.info("Terminating Analytics backend RequestListener") - return True - except Exception as e: - print ("Failed to terminate analytics backend {:}".format(e)) - LOGGER.error("Failed to terminate analytics backend {:}".format(e)) - return False - def install_servicers(self): threading.Thread(target=self.RequestListener, args=()).start() @@ -97,8 +59,9 @@ class AnalyticsBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) - break + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) + print ("Consumer error: {:}".format(receive_msg.error())) + continue try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') @@ -106,14 +69,44 @@ class AnalyticsBackendService(GenericGrpcService): print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) + self.StopDaskListener(analyzer_uuid) else: - self.StartSparkStreamer(analyzer_uuid, analyzer) + self.StartDaskListener(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def TerminateAnalyzerBackend(self, analyzer_uuid): + def StartDaskListener(self, analyzer_uuid, analyzer): + kpi_list = analyzer[ 'input_kpis' ] + thresholds = analyzer[ 'thresholds' ] + window_size = analyzer[ 'window_size' ] + window_slider = analyzer[ 'window_slider'] + + LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + print ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + try: + stop_event = Event() + thread = Thread( + target=DaskStreamer, + # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), + args=(analyzer_uuid, kpi_list, thresholds, stop_event), + kwargs={ + "window_size" : window_size, + } + ) + thread.start() + self.running_threads[analyzer_uuid] = (thread, stop_event) + print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False + + def StopDaskListener(self, analyzer_uuid): if analyzer_uuid in self.running_threads: try: thread, stop_event = self.running_threads[analyzer_uuid] @@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService): return False else: print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) - # generate confirmation towards frontend + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py new file mode 100644 index 0000000000000000000000000000000000000000..f09da9949fd0f745f80f782273024f9175db820c --- /dev/null +++ b/src/analytics/backend/service/DaskStreaming.py @@ -0,0 +1,233 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +import pandas as pd +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +def SettingKafkaConsumerParams(): + return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest'} + +def GetAggregationMappings(thresholds): + agg_dict = {} + for threshold_key in thresholds.keys(): + parts = threshold_key.split('_', 1) + if len(parts) != 2: + LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.") + continue + aggregation, metric_name = parts + # Ensure that the aggregation function is valid in pandas + if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: + LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") + continue + agg_dict[threshold_key] = ('kpi_value', aggregation) + return agg_dict + +def ApplyThresholds(aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'. + Returns: pd.DataFrame: DataFrame with additional threshold columns. + """ + for threshold_key, threshold_values in thresholds.items(): + if threshold_key not in aggregated_df.columns: + LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") + continue + if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th + aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + else: + LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") + return aggregated_df + +def initialize_dask_client(): + """ + Initialize a local Dask cluster and client. + """ + cluster = LocalCluster(n_workers=2, threads_per_worker=2) + client = Client(cluster) + LOGGER.info(f"Dask Client Initialized: {client}") + return client, cluster + +def initialize_kafka_producer(): + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + +def delivery_report(err, msg): + if err is not None: + LOGGER.error(f"Message delivery failed: {err}") + else: + LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + +def process_batch(batch, agg_mappings, thresholds): + """ + Process a batch of data and apply thresholds. + Args: batch (list of dict): List of messages from Kafka. + agg_mappings (dict): Mapping from threshold key to aggregation function. + thresholds (dict): Thresholds dictionary. + Returns: list of dict: Processed records ready to be sent to Kafka. + """ + if not batch: + LOGGER.info("Empty batch received. Skipping processing.") + return [] + + df = pd.DataFrame(batch) + df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') + df.dropna(subset=['time_stamp'], inplace=True) + required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} + if not required_columns.issubset(df.columns): + LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") + return [] + if df.empty: + LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") + return [] + + # Perform aggregations using named aggregation + try: + agg_dict = {key: value for key, value in agg_mappings.items()} + df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index() + except Exception as e: + LOGGER.error(f"Aggregation error: {e}") + return [] + + # Apply thresholds + df_thresholded = ApplyThresholds(df_agg, thresholds) + df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') + # Convert aggregated DataFrame to list of dicts + result = df_thresholded.to_dict(orient='records') + LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") + + return result + +def produce_result(result, producer, destination_topic): + for record in result: + try: + producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=delivery_report + ) + except KafkaException as e: + LOGGER.error(f"Failed to produce message: {e}") + producer.flush() + LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + +def DaskStreamer(key, kpi_list, thresholds, stop_event, + window_size="30s", time_stamp_col="time_stamp"): + client, cluster = initialize_dask_client() + consumer_conf = SettingKafkaConsumerParams() + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + producer = initialize_kafka_producer() + + # Parse window_size to seconds + try: + window_size_td = pd.to_timedelta(window_size) + window_size_seconds = window_size_td.total_seconds() + except Exception as e: + LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") + window_size_seconds = 30 + LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") + + # Extract aggregation mappings from thresholds + agg_mappings = GetAggregationMappings(thresholds) + if not agg_mappings: + LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") + consumer.close() + producer.flush() + client.close() + cluster.close() + return + try: + batch = [] + last_batch_time = time.time() + LOGGER.info("Starting to consume messages...") + + while not stop_event.is_set(): + msg = consumer.poll(1.0) + + if msg is None: + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + else: + LOGGER.error(f"Kafka error: {msg.error()}") + continue + + try: + message_value = json.loads(msg.value().decode('utf-8')) + except json.JSONDecodeError as e: + LOGGER.error(f"JSON decode error: {e}") + continue + + try: + message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') + if pd.isna(message_timestamp): + LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") + continue + window_start = message_timestamp.floor(window_size) + message_value['window_start'] = window_start + except Exception as e: + LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") + continue + + if message_value['kpi_id'] not in kpi_list: + LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") + continue + + batch.append(message_value) + + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + + except Exception as e: + LOGGER.exception(f"Error in Dask streaming process: {e}") + finally: + # Process any remaining messages in the batch + if batch: + LOGGER.info("Processing remaining messages in the batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + consumer.close() + producer.flush() + LOGGER.info("Kafka consumer and producer closed.") + client.close() + cluster.close() + LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index e5faaa1f5903d97d5201b46119eff931f420696b..a1d4d0629ab7c8729fdfaf9a9253cb5f698186d7 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -16,7 +16,7 @@ import uuid import json from common.proto.kpi_manager_pb2 import KpiId from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, - Analyzer ) + Analyzer, AnalyzerId ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -38,6 +38,13 @@ def get_threshold_dict(): op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } +def create_analyzer_id(): + _create_analyzer_id = AnalyzerId() + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + return _create_analyzer_id + def create_analyzer(): _create_analyzer = Analyzer() @@ -70,4 +77,38 @@ def create_analyzer(): _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet - return _create_analyzer \ No newline at end of file + return _create_analyzer + +def create_analyzer_dask(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + + _threshold_dict = { + 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + return _create_analyzer diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index c2e7fbe3100e9904b1128ac5349566c3bb7c8f51..48ce867478b6f40b6942ee8df13deef2117b5d55 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -19,7 +19,9 @@ import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer +from .messages import create_analyzer, create_analyzer_dask +from threading import Thread, Event +from ..service.DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -29,35 +31,108 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + + +# --- To test Dask Streamer functionality --- +# def test_StartDaskStreamer(): # Directly from the Streamer class +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# stop_event = Event() +# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] +# oper_list = ['avg', 'min', 'max',] +# thresholds = { +# 'avg_value': (10.0, 90.0), +# 'min_value': (5.0, 95.0), +# 'max_value': (15.0, 85.0), +# 'latency' : (2.0, 10.0) +# } + +# # Start the DaskStreamer in a separate thread +# streamer_thread = Thread( +# target=DaskStreamer, +# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), +# kwargs={ +# "window_size": "60s", +# "win_slide_duration": "30s", +# "time_stamp_col": "time_stamp" +# } +# ) +# streamer_thread.start() +# try: +# while True: +# time.sleep(10) +# except KeyboardInterrupt: +# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") +# stop_event.set() +# streamer_thread.join() +# LOGGER.info("Streamer stopped gracefully.") # --- To test Start Streamer functionality --- -def test_StartSparkStreamer(): - LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") - analyzer_obj = create_analyzer() - analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid - analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] - } - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) - assert isinstance(response, bool) +# def test_StartDaskStreamer(): +# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") +# analyzer_obj = create_analyzer_dask() +# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), +# # "oper_list" : analyzer_obj.parameters["oper_list"], +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) +# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) +# time.sleep(100) +# LOGGER.info('Initiating StopRequestListener...') +# # AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) -# --- To TEST StartRequestListenerFunctionality +# --- To test Start Streamer functionality --- +# def test_StartSparkStreamer(): +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# analyzer_obj = create_analyzer() +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) + +# --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() -# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start() +# AnalyticsBackendServiceObj.stop_event = Event() +# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) +# listener_thread.start() + +# time.sleep(100) + +# # AnalyticsBackendServiceObj.stop_event.set() +# # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') +# # listener_thread.join(timeout=10) +# # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." +# LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index eb25c33b03744d627efae0a436e5bdce4553b4af..e2d39585e434b58c0d48d0061e105a5ebaabe6b9 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -49,11 +49,12 @@ def create_analyzer(): _create_analyzer.output_kpi_ids.append(_kpi_id) # parameter _threshold_dict = { - # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) + } _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet return _create_analyzer