diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 0a9800d9e5839205e1e45f84e4c8bdafbe93f32f..d4efc084e5f1ea2376e71ef6a15bc9b972c5ac1d 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -39,4 +39,14 @@ enum KpiSampleType { KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605; KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; + +// output KPIs + KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; + KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; + KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; + KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; + KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; + KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; + + KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; } diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index eab275324d9f2cf87b9cf839eeebf446faba1506..11ce1b377b9b616203d0943a36101ccd7ace940f 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -17,17 +17,14 @@ import json import logging import threading -import pytz from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import Consumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from threading import Thread, Event from analytics.backend.service.Streamer import DaskStreamer -from common.proto.analytics_frontend_pb2 import Analyzer -from datetime import datetime, timedelta +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper LOGGER = logging.getLogger(__name__) @@ -35,16 +32,25 @@ logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyticsBackendService(GenericGrpcService): """ - Class listens for ... + AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService. + It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly. + It also initializes the Kafka producer and Dask cluster for the streamer. """ - def __init__(self, cls_name : str = __name__) -> None: + def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1 + ) -> None: LOGGER.info('Init AnalyticsBackendService') port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) self.active_streamers = {} - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) + self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer + self.cluster = AnalyzerHelper.initialize_dask_cluster( + n_workers, threads_per_worker) # Local cluster + self.request_consumer = Consumer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest', + }) + def install_servicers(self): threading.Thread( @@ -58,7 +64,7 @@ class AnalyticsBackendService(GenericGrpcService): """ LOGGER.info("Request Listener is initiated ...") # print ("Request Listener is initiated ...") - consumer = self.kafka_consumer + consumer = self.request_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) @@ -69,23 +75,27 @@ class AnalyticsBackendService(GenericGrpcService): continue else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - # print ("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.StopStreamer(analyzer_uuid) + if self.StopStreamer(analyzer_uuid): + LOGGER.info("Dask Streamer stopped.") + else: + LOGGER.error("Failed to stop Dask Streamer.") else: - self.StartStreamer(analyzer_uuid, analyzer) + if self.StartStreamer(analyzer_uuid, analyzer): + LOGGER.info("Dask Streamer started.") + else: + LOGGER.error("Failed to start Dask Streamer.") except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def StartStreamer(self, analyzer_uuid : str, analyzer : json): + def StartStreamer(self, analyzer_uuid : str, analyzer : dict): """ Start the DaskStreamer with the given parameters. """ @@ -94,28 +104,30 @@ class AnalyticsBackendService(GenericGrpcService): return False try: streamer = DaskStreamer( - analyzer_uuid, - analyzer['input_kpis' ], - analyzer['output_kpis'], - analyzer['thresholds' ], - analyzer['batch_size' ], - analyzer['window_size'], + key = analyzer_uuid, + input_kpis = analyzer['input_kpis' ], + output_kpis = analyzer['output_kpis'], + thresholds = analyzer['thresholds' ], + batch_size = analyzer['batch_size' ], + window_size = analyzer['window_size'], + cluster_instance = self.cluster, + producer_instance = self.central_producer, ) streamer.start() - logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}") + LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}") # Stop the streamer after the given duration - if analyzer['duration'] is not None: + if analyzer['duration'] > 0: def stop_after_duration(): time.sleep(analyzer['duration']) - logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}") - streamer.stop() + LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}") + if not self.StopStreamer(analyzer_uuid): + LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.") duration_thread = threading.Thread(target=stop_after_duration, daemon=True) duration_thread.start() self.active_streamers[analyzer_uuid] = streamer - LOGGER.info("Dask Streamer started.") return True except Exception as e: LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e)) @@ -129,14 +141,30 @@ class AnalyticsBackendService(GenericGrpcService): if analyzer_uuid not in self.active_streamers: LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid)) return False - LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}") + LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}") streamer = self.active_streamers[analyzer_uuid] streamer.stop() streamer.join() del self.active_streamers[analyzer_uuid] - LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.") + LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.") return True except Exception as e: LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e)) return False + def close(self): # TODO: Is this function needed? + """ + Close the producer and cluster cleanly. + """ + if self.central_producer: + try: + self.central_producer.flush() + LOGGER.info("Kafka producer flushed and closed.") + except Exception as e: + LOGGER.error(f"Error closing Kafka producer: {e}") + if self.cluster: + try: + self.cluster.close() + LOGGER.info("Dask cluster closed.") + except Exception as e: + LOGGER.error(f"Error closing Dask cluster: {e}") diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 0e23bab69bd5e1136ca6c0ce9fee321632160bcd..c8a9c838aae6eb249fc581bd9d090a8121ba63ac 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -16,11 +16,11 @@ import logging from enum import Enum import pandas as pd - logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') -class AnalyzerHandlers(Enum): + +class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" UNSUPPORTED_HANDLER = "UnsupportedHandler" @@ -56,7 +56,7 @@ def threshold_handler(key, aggregated_df, thresholds): aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th else: - logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.") + logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.") return aggregated_df def aggregation_handler( @@ -79,6 +79,10 @@ def aggregation_handler( # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only) df = df[df['kpi_id'].isin(input_kpi_list)].copy() + if df.empty: + logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.") + return [] + # Define all possible aggregation methods aggregation_methods = { "min" : ('kpi_value', 'min'), @@ -108,7 +112,6 @@ def aggregation_handler( selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters} # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}") - kpi_df = df[df['kpi_id'] == kpi_id] # Check if kpi_df is not empty before applying the aggregation methods @@ -119,9 +122,6 @@ def aggregation_handler( agg_df['kpi_id'] = output_kpi_list[kpi_index] - # if agg_df.empty: - # logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.") - # continue # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}") result = threshold_handler(key, agg_df, kpi_task_parameters) @@ -129,3 +129,4 @@ def aggregation_handler( else: logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.") continue + return [] diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py index 26d6e5fb9cca45b93a33038e8658cab51bab8ad5..15dde6e62dc9fa193f1307c3e3268c1fefa8a912 100644 --- a/src/analytics/backend/service/AnalyzerHelper.py +++ b/src/analytics/backend/service/AnalyzerHelper.py @@ -15,12 +15,11 @@ from dask.distributed import Client, LocalCluster from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +from confluent_kafka import Consumer, Producer import logging -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class AnalyzerHelper: @@ -28,15 +27,24 @@ class AnalyzerHelper: pass @staticmethod - def initialize_dask_client(n_workers=1, threads_per_worker=1): - """Initialize a local Dask cluster and client.""" - cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) - client = Client(cluster) + def initialize_dask_client(cluster_instance): + """Initialize a local Dask client.""" + if cluster_instance is None: + logger.error("Dask Cluster is not initialized. Exiting.") + return None + client = Client(cluster_instance) logger.info(f"Dask Client Initialized: {client}") - return client, cluster + return client + + @staticmethod + def initialize_dask_cluster(n_workers=1, threads_per_worker=2): + """Initialize a local Dask cluster""" + cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + logger.info(f"Dask Cluster Initialized: {cluster}") + return cluster @staticmethod - def initialize_kafka_consumer(): + def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters """Initialize the Kafka consumer.""" consumer_conf = { 'bootstrap.servers': KafkaConfig.get_kafka_address(), diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 35d35b36e622724344a233da33f86a588b04aad0..54ca70f5f31ae37820a270c3f4bcfa231f06c5c9 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -12,22 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import time import json -from confluent_kafka import KafkaException, KafkaError -from common.tools.kafka.Variables import KafkaTopic -from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler -from .AnalyzerHelper import AnalyzerHelper import threading +import logging + +from confluent_kafka import KafkaException, KafkaError +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler +from analytics.backend.service.AnalyzerHelper import AnalyzerHelper + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s') class DaskStreamer(threading.Thread): - def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, - window_size=None, n_workers=1, threads_per_worker=1): + def __init__(self, key, input_kpis, output_kpis, thresholds, + batch_size = 5, + window_size = None, + cluster_instance = None, + producer_instance = AnalyzerHelper.initialize_kafka_producer() + ): super().__init__() self.key = key self.input_kpis = input_kpis @@ -35,15 +41,14 @@ class DaskStreamer(threading.Thread): self.thresholds = thresholds self.window_size = window_size self.batch_size = batch_size - self.n_workers = n_workers - self.threads_per_worker = threads_per_worker self.running = True self.batch = [] # Initialize Kafka and Dask components - self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker) - self.consumer = AnalyzerHelper.initialize_kafka_consumer() - self.producer = AnalyzerHelper.initialize_kafka_producer() + self.client = AnalyzerHelper.initialize_dask_client(cluster_instance) + self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer + self.producer = producer_instance + logger.info("Dask Streamer initialized.") def run(self): @@ -56,9 +61,12 @@ class DaskStreamer(threading.Thread): logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.") break if not self.running: - logger.warning("Dask Streamer is not running. Exiting loop.") + logger.warning("Dask Streamer instance has been terminated. Exiting loop.") + break + if not self.client: + logger.warning("Dask client is not running. Exiting loop.") break - message = self.consumer.poll(timeout=2.0) # Poll for new messages after 2 sceonds + message = self.consumer.poll(timeout=2.0) if message is None: # logger.info("No new messages received.") continue @@ -74,11 +82,10 @@ class DaskStreamer(threading.Thread): logger.error(f"Failed to decode message: {message.value()}") continue self.batch.append(value) - # logger.info(f"Received message: {value}") - # Window size has a priority over batch size + # Window size has a precedence over batch size if self.window_size is None: - if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size + if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size logger.info(f"Processing based on batch size {self.batch_size}.") self.task_handler_selector() self.batch = [] @@ -94,15 +101,20 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.exception(f"Error in Dask streaming process: {e}") finally: + self.stop() logger.info(">>> Exiting Dask Streamer...") def task_handler_selector(self): """Select the task handler based on the task type.""" - if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]): - if self.client.status == 'running': - future = self.client.submit(aggregation_handler, "batch size", self.key, - self.batch, self.input_kpis, self.output_kpis, self.thresholds) - future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + logger.info(f"Batch to be processed: {self.batch}") + if Handlers.is_valid_handler(self.thresholds["task_type"]): + if self.client is not None and self.client.status == 'running': + try: + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + except Exception as e: + logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: @@ -110,6 +122,9 @@ class DaskStreamer(threading.Thread): def produce_result(self, result, destination_topic): """Produce results to the Kafka topic.""" + if not result: + logger.warning("Nothing to produce. Skipping.") + return for record in result: try: self.producer.produce( @@ -124,9 +139,13 @@ class DaskStreamer(threading.Thread): logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") def stop(self): - """Clean up Kafka and Dask resources.""" - logger.info("Shutting down resources...") + """Clean up Kafka and Dask thread resources.""" + if not self.running: + logger.info("Dask Streamer is already stopped.") + return self.running = False + logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...") + time.sleep(5) # Waiting time for running tasks to complete if self.consumer: try: self.consumer.close() @@ -134,13 +153,6 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.error(f"Error closing Kafka consumer: {e}") - if self.producer: - try: - self.producer.flush() - logger.info("Kafka producer flushed and closed.") - except Exception as e: - logger.error(f"Error closing Kafka producer: {e}") - if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running': try: self.client.close() @@ -148,11 +160,4 @@ class DaskStreamer(threading.Thread): except Exception as e: logger.error(f"Error closing Dask client: {e}") - if self.cluster is not None and hasattr(self.cluster, 'close'): - try: - self.cluster.close(timeout=5) - logger.info("Dask cluster closed.") - except Exception as e: - logger.error(f"Timeout error while closing Dask cluster: {e}") - - +# TODO: May be Single streamer for all analyzers ... ? diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py index 6a303d474071928456b7f152f78ec20e25bd9ff3..4a119d948864d74fc650983e96406fedd3b73e6a 100644 --- a/src/analytics/backend/tests/messages_analyzer.py +++ b/src/analytics/backend/tests/messages_analyzer.py @@ -13,7 +13,7 @@ # limitations under the License. import pandas as pd -from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers +from analytics.backend.service.AnalyzerHandlers import Handlers def get_input_kpi_list(): return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] @@ -23,16 +23,16 @@ def get_output_kpi_list(): def get_thresholds(): return { - "task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value, + "task_type": Handlers.AGGREGATION_HANDLER.value, "task_parameter": [ - {"last": (40, 80), "variance": (300, 500)}, - {"count": (2, 4), "max": (70, 100)}, - {"min": (10, 20), "avg": (50, 70)}, + {"last": [40, 80], "variance": [300, 500]}, + {"count": [2, 4], "max": [70, 100]}, + {"min": [10, 20], "avg": [50, 70]}, ], } def get_duration(): - return 60 + return 40 def get_windows_size(): return None diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 09be90e4c02b7231adb4724052df2ee36738852d..3be34ee9fdd81ab95044b85017785b221406b281 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -13,107 +13,177 @@ # limitations under the License. import time +import json import pytest import logging import pandas as pd -from unittest.mock import MagicMock, patch -from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.Streamer import DaskStreamer + +from unittest.mock import MagicMock, patch from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \ - get_windows_size, get_batch_size, get_agg_df -from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler + get_windows_size, get_batch_size, get_agg_df, get_duration + +from common.tools.kafka.Variables import KafkaTopic +from analytics.backend.service.Streamer import DaskStreamer +from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService + logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s') +# ---- +# Test fixtures and helper functions +# ---- + @pytest.fixture(autouse=True) def log_all_methods(request): ''' This fixture logs messages before and after each test function runs, indicating the start and end of the test. The autouse=True parameter ensures that this logging happens automatically for all tests in the module. ''' - logger.info(f" >>> Starting test: {request.node.name} >>> ") + logger.info(f" >>>>> Starting test: {request.node.name} ") yield - logger.info(f" <<< Finished test: {request.node.name} <<< ") + logger.info(f" <<<<< Finished test: {request.node.name} ") +@pytest.fixture +def mock_kafka_producer(): + mock_producer = MagicMock() + mock_producer.produce = MagicMock() + mock_producer.flush = MagicMock() + return mock_producer -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - logger.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +@pytest.fixture +def mock_dask_cluster(): + mock_cluster = MagicMock() + mock_cluster.close = MagicMock() + return mock_cluster + +@pytest.fixture +def mock_dask_client(): + mock_client = MagicMock() + mock_client.status = 'running' + mock_client.submit = MagicMock() + return mock_client + +@pytest.fixture() +def mock_kafka_consumer(): + mock_consumer = MagicMock() + mock_consumer.subscribe = MagicMock() + return mock_consumer + +@pytest.fixture() +def mock_streamer_start(): + mock_streamer = MagicMock() + mock_streamer.start = MagicMock() + return mock_streamer ########################### -# integration test of Streamer with backend service +# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods ########################### -def test_backend_integration_with_analyzer(): - backendServiceObject = AnalyticsBackendService() - backendServiceObject.install_servicers() - logger.info(" waiting for 2 minutes for the backend service before termination ... ") - time.sleep(120) - backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") - logger.info(" Backend service terminated successfully ... ") +@pytest.fixture +def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \ + patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start): + + service = AnalyticsBackendService() + yield service + service.close() + +@pytest.fixture +def analyzer_data(): + return { + 'algo_name' : 'test_algorithm', + 'oper_mode' : 'test_mode', + 'input_kpis' : ['kpi1', 'kpi2'], + 'output_kpis': ['kpi3'], + 'thresholds' : {'kpi1': 0.5}, + 'batch_size' : 10, + 'window_size': 5, + 'duration' : 20, + } + +def test_start_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + # Start streamer + result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert result is True + assert analyzer_uuid in analytics_service.active_streamers + +def test_stop_streamer(analytics_service, analyzer_data): + analyzer_uuid = "test-analyzer-uuid" + + # Start streamer for stopping it later + analytics_service.StartStreamer(analyzer_uuid, analyzer_data) + assert analyzer_uuid in analytics_service.active_streamers + # Stop streamer + result = analytics_service.StopStreamer(analyzer_uuid) + assert result is True + assert analyzer_uuid not in analytics_service.active_streamers + + # Verify that the streamer was stopped + assert analyzer_uuid not in analytics_service.active_streamers + +def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster): + analytics_service.close() + + mock_kafka_producer.flush.assert_called_once() + mock_dask_cluster.close.assert_called_once() ########################### -# funtionality pytest for analyzer sub methods +# funtionality pytest with specific fixtures for streamer class sub methods ########################### @pytest.fixture -def dask_streamer(): - with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \ - patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \ - patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer: - - mock_dask_client.return_value = (MagicMock(), MagicMock()) - mock_kafka_consumer.return_value = MagicMock() - mock_kafka_producer.return_value = MagicMock() +def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \ + patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer): return DaskStreamer( - key="test_key", - input_kpis=get_input_kpi_list(), - output_kpis=get_output_kpi_list(), - thresholds=get_thresholds(), - batch_size=get_batch_size(), - window_size=get_windows_size(), - n_workers=3, - threads_per_worker=1 + key = "test_key", + input_kpis = get_input_kpi_list(), + output_kpis = get_output_kpi_list(), + thresholds = get_thresholds(), + batch_size = get_batch_size(), + window_size = get_windows_size(), + cluster_instance = mock_dask_cluster(), + producer_instance = mock_kafka_producer(), ) -def test_initialization(dask_streamer): +def test_dask_streamer_initialization(dask_streamer): """Test if the DaskStreamer initializes correctly.""" - assert dask_streamer.key == "test_key" - assert dask_streamer.batch_size == get_batch_size() + assert dask_streamer.key == "test_key" + assert dask_streamer.batch_size == get_batch_size() assert dask_streamer.window_size is None - assert dask_streamer.n_workers == 3 - assert dask_streamer.consumer is not None - assert dask_streamer.producer is not None - assert dask_streamer.client is not None - assert dask_streamer.cluster is not None - + assert dask_streamer.consumer is not None + assert dask_streamer.producer is not None + assert dask_streamer.client is not None def test_run_stops_on_no_consumer(dask_streamer): """Test if the run method exits when the consumer is not initialized.""" dask_streamer.consumer = None with patch('time.sleep', return_value=None): dask_streamer.run() + assert not dask_streamer.running -def test_task_handler_selector_valid_handler(dask_streamer): +def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client): """Test task handler selection with a valid handler.""" - with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \ - patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \ - patch.object(dask_streamer.client, 'status', 'running'): + with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client): dask_streamer.task_handler_selector() - mock_submit.assert_called_once() + assert dask_streamer.client.status == 'running' def test_task_handler_selector_invalid_handler(dask_streamer): """Test task handler selection with an invalid handler.""" - with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False): + with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False): dask_streamer.task_handler_selector() assert dask_streamer.batch == [] @@ -126,31 +196,27 @@ def test_produce_result(dask_streamer): mock_produce.assert_called_once_with( "test_topic", key="kpi1", - value='{"kpi_id": "kpi1", "value": 100}', + value=json.dumps({"kpi_id": "kpi1", "value": 100}), callback=mock_delivery_report ) -def test_cleanup(dask_streamer): +def test_stop(dask_streamer): """Test the cleanup method.""" - with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ - patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \ - patch.object(dask_streamer.client, 'close') as mock_client_close, \ - patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close: + with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \ + patch.object(dask_streamer.client, 'close') as mock_client_close, \ + patch('time.sleep', return_value=0): # Mock the conditions required for the close calls dask_streamer.client.status = 'running' - dask_streamer.cluster.close = MagicMock() - dask_streamer.cleanup() + dask_streamer.stop() mock_consumer_close.assert_called_once() - mock_producer_flush.assert_called_once() mock_client_close.assert_called_once() - dask_streamer.cluster.close.assert_called_once() def test_run_with_valid_consumer(dask_streamer): """Test the run method with a valid Kafka consumer.""" - with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ + with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \ patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector: # Simulate valid messages without errors @@ -167,33 +233,29 @@ def test_run_with_valid_consumer(dask_streamer): # Run the `run` method in a limited loop with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays - dask_streamer.running = True # Ensure the streamer runs - dask_streamer.batch_size = 2 # Set a small batch size for the test + dask_streamer.running = True + dask_streamer.batch_size = 2 # Limit the loop by breaking it after one full processing cycle - def stop_running_after_task_handler(*args, **kwargs): + def stop_running_after_task_handler(): logger.info("Stopping the streamer after processing the first batch.") dask_streamer.running = False mock_task_handler_selector.side_effect = stop_running_after_task_handler - - # Execute the method dask_streamer.run() - # Assertions assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing mock_task_handler_selector.assert_called_once() # Task handler should be called once - mock_poll.assert_any_call(timeout=2.0) # Poll should have been called - + mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once -# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py +# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py def test_aggregation_handler(): # Create a sample batch - batch = get_batch() - input_kpi_list = get_input_kpi_list() + batch = get_batch() + input_kpi_list = get_input_kpi_list() output_kpi_list = get_output_kpi_list() - thresholds = get_thresholds() + thresholds = get_thresholds() # Test aggregation_handler aggregated_df = aggregation_handler( @@ -202,13 +264,38 @@ def test_aggregation_handler(): assert isinstance(aggregated_df, list) assert all(isinstance(item, dict) for item in aggregated_df) -# Test threshold_handler +# # Test threshold_handler def test_threshold_handler(): # Create a sample aggregated DataFrame - agg_df = get_agg_df() + agg_df = get_agg_df() thresholds = get_thresholds() # Test threshold_handler result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0]) + assert isinstance(result, pd.DataFrame) assert result.shape == (1, 7) + + +########################### +# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline) +########################### +# This is a local machine test to check the integration of the backend service with the Streamer + +# --- "test_validate_kafka_topics" should be run before the functionality tests --- +# def test_validate_kafka_topics(): +# logger.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + +# def test_backend_integration_with_analyzer(): +# backendServiceObject = AnalyticsBackendService() +# backendServiceObject.install_servicers() +# logger.info(" waiting for 2 minutes for the backend service before termination ... ") +# time.sleep(150) +# logger.info(" Initiating stop collector ... ") +# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666") +# backendServiceObject.close() +# assert isinstance(status, bool) +# assert status == True +# logger.info(" Backend service terminated successfully ... ")