Skip to content
Snippets Groups Projects
Commit 04e1cf6c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Analytics backend Service and streamer class

- Add new output KPI types
- refactor AnalyzerHandlers
- Updated streamer interaction
- added more test cases.
parent cf640e65
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
......@@ -39,4 +39,14 @@ enum KpiSampleType {
KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605;
KPISAMPLETYPE_SERVICE_LATENCY_MS = 701;
// output KPIs
KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101;
KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102;
KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103;
KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201;
KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202;
KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203;
KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701;
}
......@@ -17,17 +17,14 @@ import json
import logging
import threading
import pytz
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Consumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from threading import Thread, Event
from analytics.backend.service.Streamer import DaskStreamer
from common.proto.analytics_frontend_pb2 import Analyzer
from datetime import datetime, timedelta
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
LOGGER = logging.getLogger(__name__)
......@@ -35,16 +32,25 @@ logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class AnalyticsBackendService(GenericGrpcService):
"""
Class listens for ...
AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
It also initializes the Kafka producer and Dask cluster for the streamer.
"""
def __init__(self, cls_name : str = __name__) -> None:
def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
) -> None:
LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.active_streamers = {}
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer
self.cluster = AnalyzerHelper.initialize_dask_cluster(
n_workers, threads_per_worker) # Local cluster
self.request_consumer = Consumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest',
})
def install_servicers(self):
threading.Thread(
......@@ -58,7 +64,7 @@ class AnalyticsBackendService(GenericGrpcService):
"""
LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer = self.request_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True:
receive_msg = consumer.poll(2.0)
......@@ -69,23 +75,27 @@ class AnalyticsBackendService(GenericGrpcService):
continue
else:
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
# print ("Consumer error: {:}".format(receive_msg.error()))
break
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopStreamer(analyzer_uuid)
if self.StopStreamer(analyzer_uuid):
LOGGER.info("Dask Streamer stopped.")
else:
LOGGER.error("Failed to stop Dask Streamer.")
else:
self.StartStreamer(analyzer_uuid, analyzer)
if self.StartStreamer(analyzer_uuid, analyzer):
LOGGER.info("Dask Streamer started.")
else:
LOGGER.error("Failed to start Dask Streamer.")
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartStreamer(self, analyzer_uuid : str, analyzer : json):
def StartStreamer(self, analyzer_uuid : str, analyzer : dict):
"""
Start the DaskStreamer with the given parameters.
"""
......@@ -94,28 +104,30 @@ class AnalyticsBackendService(GenericGrpcService):
return False
try:
streamer = DaskStreamer(
analyzer_uuid,
analyzer['input_kpis' ],
analyzer['output_kpis'],
analyzer['thresholds' ],
analyzer['batch_size' ],
analyzer['window_size'],
key = analyzer_uuid,
input_kpis = analyzer['input_kpis' ],
output_kpis = analyzer['output_kpis'],
thresholds = analyzer['thresholds' ],
batch_size = analyzer['batch_size' ],
window_size = analyzer['window_size'],
cluster_instance = self.cluster,
producer_instance = self.central_producer,
)
streamer.start()
logging.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
# Stop the streamer after the given duration
if analyzer['duration'] is not None:
if analyzer['duration'] > 0:
def stop_after_duration():
time.sleep(analyzer['duration'])
logging.info(f"Stopping streamer with analyzer: {analyzer_uuid}")
streamer.stop()
LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
if not self.StopStreamer(analyzer_uuid):
LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
duration_thread.start()
self.active_streamers[analyzer_uuid] = streamer
LOGGER.info("Dask Streamer started.")
return True
except Exception as e:
LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
......@@ -129,14 +141,30 @@ class AnalyticsBackendService(GenericGrpcService):
if analyzer_uuid not in self.active_streamers:
LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False
LOGGER.info(f"Stopping streamer with key: {analyzer_uuid}")
LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
streamer = self.active_streamers[analyzer_uuid]
streamer.stop()
streamer.join()
del self.active_streamers[analyzer_uuid]
LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been stopped.")
LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
return True
except Exception as e:
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
return False
def close(self): # TODO: Is this function needed?
"""
Close the producer and cluster cleanly.
"""
if self.central_producer:
try:
self.central_producer.flush()
LOGGER.info("Kafka producer flushed and closed.")
except Exception as e:
LOGGER.error(f"Error closing Kafka producer: {e}")
if self.cluster:
try:
self.cluster.close()
LOGGER.info("Dask cluster closed.")
except Exception as e:
LOGGER.error(f"Error closing Dask cluster: {e}")
......@@ -16,11 +16,11 @@ import logging
from enum import Enum
import pandas as pd
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
class AnalyzerHandlers(Enum):
class Handlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"
......@@ -56,7 +56,7 @@ def threshold_handler(key, aggregated_df, thresholds):
aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th
else:
logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.")
logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
return aggregated_df
def aggregation_handler(
......@@ -79,6 +79,10 @@ def aggregation_handler(
# Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
df = df[df['kpi_id'].isin(input_kpi_list)].copy()
if df.empty:
logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
return []
# Define all possible aggregation methods
aggregation_methods = {
"min" : ('kpi_value', 'min'),
......@@ -108,7 +112,6 @@ def aggregation_handler(
selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
# logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
kpi_df = df[df['kpi_id'] == kpi_id]
# Check if kpi_df is not empty before applying the aggregation methods
......@@ -119,9 +122,6 @@ def aggregation_handler(
agg_df['kpi_id'] = output_kpi_list[kpi_index]
# if agg_df.empty:
# logger.warning(f"No data available for KPI: {kpi_id}. Skipping threshold application.")
# continue
# logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
result = threshold_handler(key, agg_df, kpi_task_parameters)
......@@ -129,3 +129,4 @@ def aggregation_handler(
else:
logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
continue
return []
......@@ -15,12 +15,11 @@
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
from confluent_kafka import Consumer, Producer
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class AnalyzerHelper:
......@@ -28,15 +27,24 @@ class AnalyzerHelper:
pass
@staticmethod
def initialize_dask_client(n_workers=1, threads_per_worker=1):
"""Initialize a local Dask cluster and client."""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
client = Client(cluster)
def initialize_dask_client(cluster_instance):
"""Initialize a local Dask client."""
if cluster_instance is None:
logger.error("Dask Cluster is not initialized. Exiting.")
return None
client = Client(cluster_instance)
logger.info(f"Dask Client Initialized: {client}")
return client, cluster
return client
@staticmethod
def initialize_dask_cluster(n_workers=1, threads_per_worker=2):
"""Initialize a local Dask cluster"""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
logger.info(f"Dask Cluster Initialized: {cluster}")
return cluster
@staticmethod
def initialize_kafka_consumer():
def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters
"""Initialize the Kafka consumer."""
consumer_conf = {
'bootstrap.servers': KafkaConfig.get_kafka_address(),
......
......@@ -12,22 +12,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import KafkaException, KafkaError
from common.tools.kafka.Variables import KafkaTopic
from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler
from .AnalyzerHelper import AnalyzerHelper
import threading
import logging
from confluent_kafka import KafkaException, KafkaError
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5,
window_size=None, n_workers=1, threads_per_worker=1):
def __init__(self, key, input_kpis, output_kpis, thresholds,
batch_size = 5,
window_size = None,
cluster_instance = None,
producer_instance = AnalyzerHelper.initialize_kafka_producer()
):
super().__init__()
self.key = key
self.input_kpis = input_kpis
......@@ -35,15 +41,14 @@ class DaskStreamer(threading.Thread):
self.thresholds = thresholds
self.window_size = window_size
self.batch_size = batch_size
self.n_workers = n_workers
self.threads_per_worker = threads_per_worker
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker)
self.consumer = AnalyzerHelper.initialize_kafka_consumer()
self.producer = AnalyzerHelper.initialize_kafka_producer()
self.client = AnalyzerHelper.initialize_dask_client(cluster_instance)
self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer
self.producer = producer_instance
logger.info("Dask Streamer initialized.")
def run(self):
......@@ -56,9 +61,12 @@ class DaskStreamer(threading.Thread):
logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
break
if not self.running:
logger.warning("Dask Streamer is not running. Exiting loop.")
logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
break
if not self.client:
logger.warning("Dask client is not running. Exiting loop.")
break
message = self.consumer.poll(timeout=2.0) # Poll for new messages after 2 sceonds
message = self.consumer.poll(timeout=2.0)
if message is None:
# logger.info("No new messages received.")
continue
......@@ -74,11 +82,10 @@ class DaskStreamer(threading.Thread):
logger.error(f"Failed to decode message: {message.value()}")
continue
self.batch.append(value)
# logger.info(f"Received message: {value}")
# Window size has a priority over batch size
# Window size has a precedence over batch size
if self.window_size is None:
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
self.batch = []
......@@ -94,15 +101,20 @@ class DaskStreamer(threading.Thread):
except Exception as e:
logger.exception(f"Error in Dask streaming process: {e}")
finally:
self.stop()
logger.info(">>> Exiting Dask Streamer...")
def task_handler_selector(self):
"""Select the task handler based on the task type."""
if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]):
if self.client.status == 'running':
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
logger.info(f"Batch to be processed: {self.batch}")
if Handlers.is_valid_handler(self.thresholds["task_type"]):
if self.client is not None and self.client.status == 'running':
try:
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
except Exception as e:
logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
else:
logger.warning("Dask client is not running. Skipping processing.")
else:
......@@ -110,6 +122,9 @@ class DaskStreamer(threading.Thread):
def produce_result(self, result, destination_topic):
"""Produce results to the Kafka topic."""
if not result:
logger.warning("Nothing to produce. Skipping.")
return
for record in result:
try:
self.producer.produce(
......@@ -124,9 +139,13 @@ class DaskStreamer(threading.Thread):
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def stop(self):
"""Clean up Kafka and Dask resources."""
logger.info("Shutting down resources...")
"""Clean up Kafka and Dask thread resources."""
if not self.running:
logger.info("Dask Streamer is already stopped.")
return
self.running = False
logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
time.sleep(5) # Waiting time for running tasks to complete
if self.consumer:
try:
self.consumer.close()
......@@ -134,13 +153,6 @@ class DaskStreamer(threading.Thread):
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if self.producer:
try:
self.producer.flush()
logger.info("Kafka producer flushed and closed.")
except Exception as e:
logger.error(f"Error closing Kafka producer: {e}")
if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
try:
self.client.close()
......@@ -148,11 +160,4 @@ class DaskStreamer(threading.Thread):
except Exception as e:
logger.error(f"Error closing Dask client: {e}")
if self.cluster is not None and hasattr(self.cluster, 'close'):
try:
self.cluster.close(timeout=5)
logger.info("Dask cluster closed.")
except Exception as e:
logger.error(f"Timeout error while closing Dask cluster: {e}")
# TODO: May be Single streamer for all analyzers ... ?
......@@ -13,7 +13,7 @@
# limitations under the License.
import pandas as pd
from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers
from analytics.backend.service.AnalyzerHandlers import Handlers
def get_input_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
......@@ -23,16 +23,16 @@ def get_output_kpi_list():
def get_thresholds():
return {
"task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value,
"task_type": Handlers.AGGREGATION_HANDLER.value,
"task_parameter": [
{"last": (40, 80), "variance": (300, 500)},
{"count": (2, 4), "max": (70, 100)},
{"min": (10, 20), "avg": (50, 70)},
{"last": [40, 80], "variance": [300, 500]},
{"count": [2, 4], "max": [70, 100]},
{"min": [10, 20], "avg": [50, 70]},
],
}
def get_duration():
return 60
return 40
def get_windows_size():
return None
......
......@@ -13,107 +13,177 @@
# limitations under the License.
import time
import json
import pytest
import logging
import pandas as pd
from unittest.mock import MagicMock, patch
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from unittest.mock import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
get_windows_size, get_batch_size, get_agg_df, get_duration
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# ----
# Test fixtures and helper functions
# ----
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
logger.info(f" >>> Starting test: {request.node.name} >>> ")
logger.info(f" >>>>> Starting test: {request.node.name} ")
yield
logger.info(f" <<< Finished test: {request.node.name} <<< ")
logger.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def mock_kafka_producer():
mock_producer = MagicMock()
mock_producer.produce = MagicMock()
mock_producer.flush = MagicMock()
return mock_producer
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
logger.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
@pytest.fixture
def mock_dask_cluster():
mock_cluster = MagicMock()
mock_cluster.close = MagicMock()
return mock_cluster
@pytest.fixture
def mock_dask_client():
mock_client = MagicMock()
mock_client.status = 'running'
mock_client.submit = MagicMock()
return mock_client
@pytest.fixture()
def mock_kafka_consumer():
mock_consumer = MagicMock()
mock_consumer.subscribe = MagicMock()
return mock_consumer
@pytest.fixture()
def mock_streamer_start():
mock_streamer = MagicMock()
mock_streamer.start = MagicMock()
return mock_streamer
###########################
# integration test of Streamer with backend service
# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
###########################
def test_backend_integration_with_analyzer():
backendServiceObject = AnalyticsBackendService()
backendServiceObject.install_servicers()
logger.info(" waiting for 2 minutes for the backend service before termination ... ")
time.sleep(120)
backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
logger.info(" Backend service terminated successfully ... ")
@pytest.fixture
def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start):
service = AnalyticsBackendService()
yield service
service.close()
@pytest.fixture
def analyzer_data():
return {
'algo_name' : 'test_algorithm',
'oper_mode' : 'test_mode',
'input_kpis' : ['kpi1', 'kpi2'],
'output_kpis': ['kpi3'],
'thresholds' : {'kpi1': 0.5},
'batch_size' : 10,
'window_size': 5,
'duration' : 20,
}
def test_start_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer
result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert result is True
assert analyzer_uuid in analytics_service.active_streamers
def test_stop_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer for stopping it later
analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert analyzer_uuid in analytics_service.active_streamers
# Stop streamer
result = analytics_service.StopStreamer(analyzer_uuid)
assert result is True
assert analyzer_uuid not in analytics_service.active_streamers
# Verify that the streamer was stopped
assert analyzer_uuid not in analytics_service.active_streamers
def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
analytics_service.close()
mock_kafka_producer.flush.assert_called_once()
mock_dask_cluster.close.assert_called_once()
###########################
# funtionality pytest for analyzer sub methods
# funtionality pytest with specific fixtures for streamer class sub methods
###########################
@pytest.fixture
def dask_streamer():
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer:
mock_dask_client.return_value = (MagicMock(), MagicMock())
mock_kafka_consumer.return_value = MagicMock()
mock_kafka_producer.return_value = MagicMock()
def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
return DaskStreamer(
key="test_key",
input_kpis=get_input_kpi_list(),
output_kpis=get_output_kpi_list(),
thresholds=get_thresholds(),
batch_size=get_batch_size(),
window_size=get_windows_size(),
n_workers=3,
threads_per_worker=1
key = "test_key",
input_kpis = get_input_kpi_list(),
output_kpis = get_output_kpi_list(),
thresholds = get_thresholds(),
batch_size = get_batch_size(),
window_size = get_windows_size(),
cluster_instance = mock_dask_cluster(),
producer_instance = mock_kafka_producer(),
)
def test_initialization(dask_streamer):
def test_dask_streamer_initialization(dask_streamer):
"""Test if the DaskStreamer initializes correctly."""
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.window_size is None
assert dask_streamer.n_workers == 3
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
assert dask_streamer.cluster is not None
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
def test_run_stops_on_no_consumer(dask_streamer):
"""Test if the run method exits when the consumer is not initialized."""
dask_streamer.consumer = None
with patch('time.sleep', return_value=None):
dask_streamer.run()
assert not dask_streamer.running
def test_task_handler_selector_valid_handler(dask_streamer):
def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
"""Test task handler selection with a valid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \
patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \
patch.object(dask_streamer.client, 'status', 'running'):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):
dask_streamer.task_handler_selector()
mock_submit.assert_called_once()
assert dask_streamer.client.status == 'running'
def test_task_handler_selector_invalid_handler(dask_streamer):
"""Test task handler selection with an invalid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False):
with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
dask_streamer.task_handler_selector()
assert dask_streamer.batch == []
......@@ -126,31 +196,27 @@ def test_produce_result(dask_streamer):
mock_produce.assert_called_once_with(
"test_topic",
key="kpi1",
value='{"kpi_id": "kpi1", "value": 100}',
value=json.dumps({"kpi_id": "kpi1", "value": 100}),
callback=mock_delivery_report
)
def test_cleanup(dask_streamer):
def test_stop(dask_streamer):
"""Test the cleanup method."""
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close:
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch('time.sleep', return_value=0):
# Mock the conditions required for the close calls
dask_streamer.client.status = 'running'
dask_streamer.cluster.close = MagicMock()
dask_streamer.cleanup()
dask_streamer.stop()
mock_consumer_close.assert_called_once()
mock_producer_flush.assert_called_once()
mock_client_close.assert_called_once()
dask_streamer.cluster.close.assert_called_once()
def test_run_with_valid_consumer(dask_streamer):
"""Test the run method with a valid Kafka consumer."""
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
# Simulate valid messages without errors
......@@ -167,33 +233,29 @@ def test_run_with_valid_consumer(dask_streamer):
# Run the `run` method in a limited loop
with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays
dask_streamer.running = True # Ensure the streamer runs
dask_streamer.batch_size = 2 # Set a small batch size for the test
dask_streamer.running = True
dask_streamer.batch_size = 2
# Limit the loop by breaking it after one full processing cycle
def stop_running_after_task_handler(*args, **kwargs):
def stop_running_after_task_handler():
logger.info("Stopping the streamer after processing the first batch.")
dask_streamer.running = False
mock_task_handler_selector.side_effect = stop_running_after_task_handler
# Execute the method
dask_streamer.run()
# Assertions
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=2.0) # Poll should have been called
mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once
# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
# Create a sample batch
batch = get_batch()
input_kpi_list = get_input_kpi_list()
batch = get_batch()
input_kpi_list = get_input_kpi_list()
output_kpi_list = get_output_kpi_list()
thresholds = get_thresholds()
thresholds = get_thresholds()
# Test aggregation_handler
aggregated_df = aggregation_handler(
......@@ -202,13 +264,38 @@ def test_aggregation_handler():
assert isinstance(aggregated_df, list)
assert all(isinstance(item, dict) for item in aggregated_df)
# Test threshold_handler
# # Test threshold_handler
def test_threshold_handler():
# Create a sample aggregated DataFrame
agg_df = get_agg_df()
agg_df = get_agg_df()
thresholds = get_thresholds()
# Test threshold_handler
result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
assert isinstance(result, pd.DataFrame)
assert result.shape == (1, 7)
###########################
# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
###########################
# This is a local machine test to check the integration of the backend service with the Streamer
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# logger.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# def test_backend_integration_with_analyzer():
# backendServiceObject = AnalyticsBackendService()
# backendServiceObject.install_servicers()
# logger.info(" waiting for 2 minutes for the backend service before termination ... ")
# time.sleep(150)
# logger.info(" Initiating stop collector ... ")
# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# backendServiceObject.close()
# assert isinstance(status, bool)
# assert status == True
# logger.info(" Backend service terminated successfully ... ")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment