Skip to content
Snippets Groups Projects
Commit bd059d3d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/247-cttc-analytics-module-enhancements' into 'develop'

Resolve "(CTTC) Analytics Module Enhancements"

See merge request !317
parents 4e7a76d8 5d18e008
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
......@@ -39,4 +39,14 @@ enum KpiSampleType {
KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO = 605;
KPISAMPLETYPE_SERVICE_LATENCY_MS = 701;
// output KPIs
KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101;
KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102;
KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103;
KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201;
KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202;
KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203;
KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701;
}
......@@ -18,8 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/backend/tests/test_backend.py
......@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
analytics/frontend/tests/test_frontend.py
......@@ -16,32 +16,46 @@ import time
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Consumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from threading import Thread, Event
from .DaskStreaming import DaskStreamer
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
"""
Class listens for ...
AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
It also initializes the Kafka producer and Dask cluster for the streamer.
"""
def __init__(self, cls_name : str = __name__) -> None:
def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
) -> None:
LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
self.active_streamers = {}
self.central_producer = AnalyzerHelper.initialize_kafka_producer() # Multi-threaded producer
self.cluster = AnalyzerHelper.initialize_dask_cluster(
n_workers, threads_per_worker) # Local cluster
self.request_consumer = Consumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest',
})
def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start()
threading.Thread(
target=self.RequestListener,
args=()
).start()
def RequestListener(self):
"""
......@@ -49,7 +63,7 @@ class AnalyticsBackendService(GenericGrpcService):
"""
LOGGER.info("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer = self.request_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True:
receive_msg = consumer.poll(2.0)
......@@ -60,65 +74,96 @@ class AnalyticsBackendService(GenericGrpcService):
continue
else:
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
# print ("Consumer error: {:}".format(receive_msg.error()))
break
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid)
if self.StopStreamer(analyzer_uuid):
LOGGER.info("Dask Streamer stopped.")
else:
LOGGER.error("Failed to stop Dask Streamer.")
else:
self.StartDaskListener(analyzer_uuid, analyzer)
if self.StartStreamer(analyzer_uuid, analyzer):
LOGGER.info("Dask Streamer started.")
else:
LOGGER.error("Failed to start Dask Streamer.")
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
# print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
thresholds = analyzer[ 'thresholds' ]
window_size = analyzer[ 'window_size' ]
window_slider = analyzer[ 'window_slider']
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
# print ("Received parameters: {:} - {:} - {:} - {:}".format(
# kpi_list, thresholds, window_size, window_slider))
def StartStreamer(self, analyzer_uuid : str, analyzer : dict):
"""
Start the DaskStreamer with the given parameters.
"""
if analyzer_uuid in self.active_streamers:
LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False
try:
stop_event = Event()
thread = Thread(
target=DaskStreamer,
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
kwargs={
"window_size" : window_size,
}
streamer = DaskStreamer(
key = analyzer_uuid,
input_kpis = analyzer['input_kpis' ],
output_kpis = analyzer['output_kpis'],
thresholds = analyzer['thresholds' ],
batch_size = analyzer['batch_size' ],
window_size = analyzer['window_size'],
cluster_instance = self.cluster,
producer_instance = self.central_producer,
)
thread.start()
self.running_threads[analyzer_uuid] = (thread, stop_event)
# print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
streamer.start()
LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
# Stop the streamer after the given duration
if analyzer['duration'] > 0:
def stop_after_duration():
time.sleep(analyzer['duration'])
LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
if not self.StopStreamer(analyzer_uuid):
LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
duration_thread.start()
self.active_streamers[analyzer_uuid] = streamer
return True
except Exception as e:
LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
return False
def StopStreamer(self, analyzer_uuid : str):
"""
Stop the DaskStreamer with the given analyzer_uuid.
"""
try:
if analyzer_uuid not in self.active_streamers:
LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
return False
LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
streamer = self.active_streamers[analyzer_uuid]
streamer.stop()
streamer.join()
del self.active_streamers[analyzer_uuid]
LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
return True
except Exception as e:
# print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
return False
def StopDaskListener(self, analyzer_uuid):
if analyzer_uuid in self.running_threads:
def close(self): # TODO: Is this function needed?
"""
Close the producer and cluster cleanly.
"""
if self.central_producer:
try:
thread, stop_event = self.running_threads[analyzer_uuid]
stop_event.set()
thread.join()
del self.running_threads[analyzer_uuid]
# print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True
self.central_producer.flush()
LOGGER.info("Kafka producer flushed and closed.")
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False
else:
# print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.error(f"Error closing Kafka producer: {e}")
if self.cluster:
try:
self.cluster.close()
LOGGER.info("Dask cluster closed.")
except Exception as e:
LOGGER.error(f"Error closing Dask cluster: {e}")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from enum import Enum
import pandas as pd
logger = logging.getLogger(__name__)
class Handlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"
@classmethod
def is_valid_handler(cls, handler_name):
return handler_name in cls._value2member_map_
# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args:
key (str): Key for the aggregated DataFrame.
aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
Returns:
pd.DataFrame: DataFrame with additional threshold columns.
"""
for metric_name, threshold_values in thresholds.items():
# Ensure the metric column exists in the DataFrame
if metric_name not in aggregated_df.columns:
logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
continue
# Ensure the threshold values are valid (check for tuple specifically)
if isinstance(threshold_values, list) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
# Add threshold columns with updated naming
aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th
else:
logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
return aggregated_df
def aggregation_handler(
batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
):
"""
Process a batch of data and calculate aggregated values for each input KPI
and maps them to the output KPIs. """
logger.info(f"({batch_type_name}) Processing batch for key: {key}")
if not batch:
logger.info("Empty batch received. Skipping processing.")
return []
else:
logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
# Convert data into a DataFrame
df = pd.DataFrame(batch)
# Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
df = df[df['kpi_id'].isin(input_kpi_list)].copy()
if df.empty:
logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
return []
# Define all possible aggregation methods
aggregation_methods = {
"min" : ('kpi_value', 'min'),
"max" : ('kpi_value', 'max'),
"avg" : ('kpi_value', 'mean'),
"first" : ('kpi_value', lambda x: x.iloc[0]),
"last" : ('kpi_value', lambda x: x.iloc[-1]),
"variance": ('kpi_value', 'var'),
"count" : ('kpi_value', 'count'),
"range" : ('kpi_value', lambda x: x.max() - x.min()),
"sum" : ('kpi_value', 'sum'),
}
# Process each KPI-specific task parameter
for kpi_index, kpi_id in enumerate(input_kpi_list):
# logger.info(f"1.Processing KPI: {kpi_id}")
kpi_task_parameters = thresholds["task_parameter"][kpi_index]
# Get valid task parameters for this KPI
valid_task_parameters = [
method for method in kpi_task_parameters.keys()
if method in aggregation_methods
]
# Select the aggregation methods based on valid task parameters
selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
# logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
kpi_df = df[df['kpi_id'] == kpi_id]
# Check if kpi_df is not empty before applying the aggregation methods
if not kpi_df.empty:
agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()
# logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
agg_df['kpi_id'] = output_kpi_list[kpi_index]
# logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
result = threshold_handler(key, agg_df, kpi_task_parameters)
return result.to_dict(orient='records')
else:
logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
continue
return []
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer, Producer
import logging
logger = logging.getLogger(__name__)
class AnalyzerHelper:
def __init__(self):
pass
@staticmethod
def initialize_dask_client(cluster_instance):
"""Initialize a local Dask client."""
if cluster_instance is None:
logger.error("Dask Cluster is not initialized. Exiting.")
return None
client = Client(cluster_instance)
logger.info(f"Dask Client Initialized: {client}")
return client
@staticmethod
def initialize_dask_cluster(n_workers=1, threads_per_worker=2):
"""Initialize a local Dask cluster"""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
logger.info(f"Dask Cluster Initialized: {cluster}")
return cluster
@staticmethod
def initialize_kafka_consumer(): # TODO: update to receive topic and group_id as parameters
"""Initialize the Kafka consumer."""
consumer_conf = {
'bootstrap.servers': KafkaConfig.get_kafka_address(),
'group.id': 'analytics-backend',
'auto.offset.reset': 'latest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
return consumer
@staticmethod
def initialize_kafka_producer():
"""Initialize the Kafka producer."""
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
@staticmethod
def delivery_report(err, msg):
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["value"] = aggregated_df[threshold_key]
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds, key):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
#example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
#given that threshold has 1 value
second_value_tuple = next(iter(agg_dict.values()))[1]
#in case we have multiple thresholds!
#second_values_tuples = [value[1] for value in agg_dict.values()]
if second_value_tuple=="min":
df_agg = df_agg_.min(numeric_only=True).to_frame().T
elif second_value_tuple == "max":
df_agg = df_agg_.max(numeric_only=True).to_frame().T
elif second_value_tuple == "std":
df_agg = df_agg_.sted(numeric_only=True).to_frame().T
else:
df_agg = df_agg_.mean(numeric_only=True).to_frame().T
# Assign the first value of window_start from the original aggregated data
df_agg['window_start'] = df_agg_['window_start'].iloc[0]
# Reorder columns to place 'window_start' first if needed
cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
df_agg = df_agg[cols]
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import json
import threading
import logging
from confluent_kafka import KafkaException, KafkaError
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds,
batch_size = 5,
window_size = None,
cluster_instance = None,
producer_instance = AnalyzerHelper.initialize_kafka_producer()
):
super().__init__()
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size
self.batch_size = batch_size
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client = AnalyzerHelper.initialize_dask_client(cluster_instance)
self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer
self.producer = producer_instance
logger.info("Dask Streamer initialized.")
def run(self):
"""Main method to start the DaskStreamer."""
try:
logger.info("Starting Dask Streamer")
last_batch_time = time.time()
while True:
if not self.consumer:
logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
break
if not self.running:
logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
break
if not self.client:
logger.warning("Dask client is not running. Exiting loop.")
break
message = self.consumer.poll(timeout=2.0)
if message is None:
# logger.info("No new messages received.")
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
elif message.error():
raise KafkaException(message.error())
else:
try:
value = json.loads(message.value())
except json.JSONDecodeError:
logger.error(f"Failed to decode message: {message.value()}")
continue
self.batch.append(value)
# Window size has a precedence over batch size
if self.window_size is None:
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
self.batch = []
else:
# Process based on window size
current_time = time.time()
if (current_time - last_batch_time) >= self.window_size and self.batch:
logger.info(f"Processing based on window size {self.window_size}.")
self.task_handler_selector()
self.batch = []
last_batch_time = current_time
except Exception as e:
logger.exception(f"Error in Dask streaming process: {e}")
finally:
self.stop()
logger.info(">>> Exiting Dask Streamer...")
def task_handler_selector(self):
"""Select the task handler based on the task type."""
logger.info(f"Batch to be processed: {self.batch}")
if Handlers.is_valid_handler(self.thresholds["task_type"]):
if self.client is not None and self.client.status == 'running':
try:
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
except Exception as e:
logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
else:
logger.warning("Dask client is not running. Skipping processing.")
else:
logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
def produce_result(self, result, destination_topic):
"""Produce results to the Kafka topic."""
if not result:
logger.warning("Nothing to produce. Skipping.")
return
for record in result:
try:
self.producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=AnalyzerHelper.delivery_report
)
except KafkaException as e:
logger.error(f"Failed to produce message: {e}")
self.producer.flush()
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def stop(self):
"""Clean up Kafka and Dask thread resources."""
if not self.running:
logger.info("Dask Streamer is already stopped.")
return
self.running = False
logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
time.sleep(5) # Waiting time for running tasks to complete
if self.consumer:
try:
self.consumer.close()
logger.info("Kafka consumer closed.")
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
try:
self.client.close()
logger.info("Dask client closed.")
except Exception as e:
logger.error(f"Error closing Dask client: {e}")
# TODO: May be Single streamer for all analyzers ... ?
......@@ -53,8 +53,8 @@ def create_analyzer():
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id)
......@@ -63,11 +63,14 @@ def create_analyzer():
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from analytics.backend.service.AnalyzerHandlers import Handlers
def get_input_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
def get_output_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4']
def get_thresholds():
return {
"task_type": Handlers.AGGREGATION_HANDLER.value,
"task_parameter": [
{"last": [40, 80], "variance": [300, 500]},
{"count": [2, 4], "max": [70, 100]},
{"min": [10, 20], "avg": [50, 70]},
],
}
def get_duration():
return 40
def get_windows_size():
return None
def get_batch_size():
return 10
def get_interval():
return 5
def get_batch():
return [
{"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72},
{"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22},
{"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24},
{"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67},
{"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6},
{"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9},
{"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44},
{"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76},
{"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71},
{"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44},
]
def get_agg_df():
data = [
{"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41},
]
return pd.DataFrame(data)
......@@ -12,148 +12,290 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time, json
from typing import Dict
import time
import json
import pytest
import logging
from threading import Event, Thread
from common.tools.kafka.Variables import KafkaTopic
import pandas as pd
from unittest.mock import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df, get_duration
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# ----
# Test fixtures and helper functions
# ----
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
logger.info(f" >>>>> Starting test: {request.node.name} ")
yield
logger.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def mock_kafka_producer():
mock_producer = MagicMock()
mock_producer.produce = MagicMock()
mock_producer.flush = MagicMock()
return mock_producer
@pytest.fixture
def mock_dask_cluster():
mock_cluster = MagicMock()
mock_cluster.close = MagicMock()
return mock_cluster
@pytest.fixture
def mock_dask_client():
mock_client = MagicMock()
mock_client.status = 'running'
mock_client.submit = MagicMock()
return mock_client
@pytest.fixture()
def mock_kafka_consumer():
mock_consumer = MagicMock()
mock_consumer.subscribe = MagicMock()
return mock_consumer
@pytest.fixture()
def mock_streamer_start():
mock_streamer = MagicMock()
mock_streamer.start = MagicMock()
return mock_streamer
###########################
# Tests Implementation of Telemetry Backend
# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# stop_event = Event()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
@pytest.fixture
def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start):
service = AnalyticsBackendService()
yield service
service.close()
# --- To test Start Streamer functionality ---
# def test_StartSparkStreamer():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# analyzer_obj = create_analyzer()
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
@pytest.fixture
def analyzer_data():
return {
'algo_name' : 'test_algorithm',
'oper_mode' : 'test_mode',
'input_kpis' : get_input_kpi_list(),
'output_kpis': get_output_kpi_list(),
'thresholds' : get_thresholds(),
'batch_size' : get_batch_size(),
'window_size': get_windows_size(),
'duration' : get_duration(),
}
# --- To TEST StartRequestListenerFunctionality
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# AnalyticsBackendServiceObj.stop_event = Event()
# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
# listener_thread.start()
# time.sleep(100)
# AnalyticsBackendServiceObj.stop_event.set()
# LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
# listener_thread.join(timeout=10)
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
# LOGGER.info('Completed test_RunRequestListener')
# To test START and STOP communication together
# def test_StopRequestListener():
# LOGGER.info('test_RunRequestListener')
# LOGGER.info('Initiating StartRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# # LOGGER.debug(str(response_thread))
# time.sleep(10)
# LOGGER.info('Initiating StopRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
def test_start_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer
result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert result is True
assert analyzer_uuid in analytics_service.active_streamers
def test_stop_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer for stopping it later
analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert analyzer_uuid in analytics_service.active_streamers
# Stop streamer
result = analytics_service.StopStreamer(analyzer_uuid)
assert result is True
assert analyzer_uuid not in analytics_service.active_streamers
# Verify that the streamer was stopped
assert analyzer_uuid not in analytics_service.active_streamers
def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
analytics_service.close()
mock_kafka_producer.flush.assert_called_once()
mock_dask_cluster.close.assert_called_once()
###########################
# funtionality pytest with specific fixtures for streamer class sub methods
###########################
@pytest.fixture
def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
return DaskStreamer(
key = "test_key",
input_kpis = get_input_kpi_list(),
output_kpis = get_output_kpi_list(),
thresholds = get_thresholds(),
batch_size = get_batch_size(),
window_size = get_windows_size(),
cluster_instance = mock_dask_cluster(),
producer_instance = mock_kafka_producer(),
)
# To independently tests the SparkListener functionality
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.RunSparkStreamer(
# get_kpi_id_list(), get_operation_list(), get_threshold_dict()
# )
# LOGGER.debug(str(response))
def test_dask_streamer_initialization(dask_streamer):
"""Test if the DaskStreamer initializes correctly."""
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.window_size is None
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
def test_run_stops_on_no_consumer(dask_streamer):
"""Test if the run method exits when the consumer is not initialized."""
dask_streamer.consumer = None
with patch('time.sleep', return_value=None):
dask_streamer.run()
assert not dask_streamer.running
def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
"""Test task handler selection with a valid handler."""
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):
dask_streamer.task_handler_selector()
assert dask_streamer.client.status == 'running'
def test_task_handler_selector_invalid_handler(dask_streamer):
"""Test task handler selection with an invalid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
dask_streamer.task_handler_selector()
assert dask_streamer.batch == []
def test_produce_result(dask_streamer):
"""Test if produce_result sends records to Kafka."""
result = [{"kpi_id": "kpi1", "value": 100}]
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
patch.object(dask_streamer.producer, 'produce') as mock_produce:
dask_streamer.produce_result(result, "test_topic")
mock_produce.assert_called_once_with(
"test_topic",
key="kpi1",
value=json.dumps({"kpi_id": "kpi1", "value": 100}),
callback=mock_delivery_report
)
def test_stop(dask_streamer):
"""Test the cleanup method."""
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch('time.sleep', return_value=0):
# Mock the conditions required for the close calls
dask_streamer.client.status = 'running'
dask_streamer.stop()
mock_consumer_close.assert_called_once()
mock_client_close.assert_called_once()
def test_run_with_valid_consumer(dask_streamer):
"""Test the run method with a valid Kafka consumer."""
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
# Simulate valid messages without errors
mock_message_1 = MagicMock()
mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
mock_message_1.error.return_value = None # No error
mock_message_2 = MagicMock()
mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
mock_message_2.error.return_value = None # No error
# Mock `poll` to return valid messages
mock_poll.side_effect = [mock_message_1, mock_message_2]
# Run the `run` method in a limited loop
with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays
dask_streamer.running = True
dask_streamer.batch_size = 2
# Limit the loop by breaking it after one full processing cycle
def stop_running_after_task_handler():
logger.info("Stopping the streamer after processing the first batch.")
dask_streamer.running = False
mock_task_handler_selector.side_effect = stop_running_after_task_handler
dask_streamer.run()
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=2.0) # Poll should have been called at least once
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
# Create a sample batch
batch = get_batch()
input_kpi_list = get_input_kpi_list()
output_kpi_list = get_output_kpi_list()
thresholds = get_thresholds()
# Test aggregation_handler
aggregated_df = aggregation_handler(
"test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
)
assert isinstance(aggregated_df, list)
assert all(isinstance(item, dict) for item in aggregated_df)
# # Test threshold_handler
def test_threshold_handler():
# Create a sample aggregated DataFrame
agg_df = get_agg_df()
thresholds = get_thresholds()
# Test threshold_handler
result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
assert isinstance(result, pd.DataFrame)
assert result.shape == (1, 7)
###########################
# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
###########################
# This is a local machine test to check the integration of the backend service with the Streamer
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# logger.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# def test_backend_integration_with_analyzer():
# backendServiceObject = AnalyticsBackendService()
# backendServiceObject.install_servicers()
# logger.info(" waiting for 2 minutes for the backend service before termination ... ")
# time.sleep(150)
# logger.info(" Initiating stop collector ... ")
# status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# backendServiceObject.close()
# assert isinstance(status, bool)
# assert status == True
# logger.info(" Backend service terminated successfully ... ")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment