Skip to content
Snippets Groups Projects
Commit 8c1d7155 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Telemetry backend Analytics

- Added AnalyzerHelper class
- Added AnalyzerHandler class
- Added updated streamer class
- Update test scripts for analytics backend
- Updated run script format
parent 7dbbfc03
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
......@@ -18,8 +18,11 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/backend/tests/test_backend.py
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from enum import Enum
import pandas as pd
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
class AnalyzerHandlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"
@classmethod
def is_valid_handler(cls, handler_name):
return handler_name in cls._value2member_map_
# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args:
key (str): Key for the aggregated DataFrame.
aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
Returns:
pd.DataFrame: DataFrame with additional threshold columns.
"""
for metric_name, threshold_values in thresholds.items():
# Ensure the metric column exists in the DataFrame
if metric_name not in aggregated_df.columns:
logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
continue
# Ensure the threshold values are valid (check for tuple specifically)
if isinstance(threshold_values, tuple) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
# Add threshold columns with updated naming
aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
aggregated_df[f"{metric_name}_TH_FALL"] = aggregated_df[metric_name] < fail_th
else:
logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.")
return aggregated_df
def aggregation_handler(
batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
):
"""
Process a batch of data and calculate aggregated values for each input KPI
and maps them to the output KPIs. """
logger.info(f"({batch_type_name}) Processing batch for key: {key}")
if not batch:
logger.info("Empty batch received. Skipping processing.")
return []
else:
logger.info(f"Processing {len(batch)} records for key: {key}")
# Convert data into a DataFrame
df = pd.DataFrame(batch)
# Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
df = df[df['kpi_id'].isin(input_kpi_list)].copy()
# Define all possible aggregation methods
aggregation_methods = {
"min" : ('kpi_value', 'min'),
"max" : ('kpi_value', 'max'),
"avg" : ('kpi_value', 'mean'),
"first" : ('kpi_value', lambda x: x.iloc[0]),
"last" : ('kpi_value', lambda x: x.iloc[-1]),
"variance": ('kpi_value', 'var'),
"count" : ('kpi_value', 'count'),
"range" : ('kpi_value', lambda x: x.max() - x.min()),
"sum" : ('kpi_value', 'sum'),
}
# Process each KPI-specific task parameter
for kpi_index, kpi_id in enumerate(input_kpi_list):
# logger.info(f"1.Processing KPI: {kpi_id}")
kpi_task_parameters = thresholds["task_parameter"][kpi_index]
# Get valid task parameters for this KPI
valid_task_parameters = [
method for method in kpi_task_parameters.keys()
if method in aggregation_methods
]
# Select the aggregation methods based on valid task parameters
selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
# logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
kpi_df = df[df['kpi_id'] == kpi_id]
# Check if kpi_df is not empty before applying the aggregation methods
if not kpi_df.empty:
agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()
# logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
agg_df['kpi_id'] = output_kpi_list[kpi_index]
result = threshold_handler(key, agg_df, kpi_task_parameters)
return result.to_dict(orient='records')
else:
logger.debug(f"No data available for KPI: {kpi_id}. Skipping aggregation.")
continue
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
class AnalyzerHelper:
def __init__(self):
pass
@staticmethod
def initialize_dask_client(n_workers=1, threads_per_worker=1):
"""Initialize a local Dask cluster and client."""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
client = Client(cluster)
logger.info(f"Dask Client Initialized: {client}")
return client, cluster
@staticmethod
def initialize_kafka_consumer():
"""Initialize the Kafka consumer."""
consumer_conf = {
'bootstrap.servers': KafkaConfig.get_kafka_address(),
'group.id': 'analytics-backend',
'auto.offset.reset': 'latest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
return consumer
@staticmethod
def initialize_kafka_producer():
"""Initialize the Kafka producer."""
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
@staticmethod
def delivery_report(err, msg):
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["value"] = aggregated_df[threshold_key]
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds, key):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
#example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
#given that threshold has 1 value
second_value_tuple = next(iter(agg_dict.values()))[1]
#in case we have multiple thresholds!
#second_values_tuples = [value[1] for value in agg_dict.values()]
if second_value_tuple=="min":
df_agg = df_agg_.min(numeric_only=True).to_frame().T
elif second_value_tuple == "max":
df_agg = df_agg_.max(numeric_only=True).to_frame().T
elif second_value_tuple == "std":
df_agg = df_agg_.sted(numeric_only=True).to_frame().T
else:
df_agg = df_agg_.mean(numeric_only=True).to_frame().T
# Assign the first value of window_start from the original aggregated data
df_agg['window_start'] = df_agg_['window_start'].iloc[0]
# Reorder columns to place 'window_start' first if needed
cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
df_agg = df_agg[cols]
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import KafkaException, KafkaError
# import pandas as pd
from common.tools.kafka.Variables import KafkaTopic
from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler
from .AnalyzerHelper import AnalyzerHelper
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
class DaskStreamer:
def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5,
window_size=None, n_workers=5, threads_per_worker=2):
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size
self.batch_size = batch_size
self.n_workers = n_workers
self.threads_per_worker = threads_per_worker
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker)
self.consumer = AnalyzerHelper.initialize_kafka_consumer()
self.producer = AnalyzerHelper.initialize_kafka_producer()
logger.info("Dask Streamer initialized.")
def run(self):
"""Main method to start the DaskStreamer."""
try:
logger.info("Starting Dask Streamer")
last_batch_time = time.time()
while True:
if not self.consumer:
logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
break
if not self.running:
logger.warning("Dask Streamer is not running. Exiting loop.")
break
message = self.consumer.poll(timeout=2.0)
if message is None:
# logger.info("No new messages received.")
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
elif message.error():
raise KafkaException(message.error())
else:
try:
value = json.loads(message.value())
except json.JSONDecodeError:
logger.error(f"Failed to decode message: {message.value()}")
continue
self.batch.append(value)
# logger.info(f"Received message: {value}")
# Window size has a priority over batch size
if self.window_size is None:
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
self.batch = []
else:
# Process based on window size
current_time = time.time()
if (current_time - last_batch_time) >= self.window_size and self.batch:
logger.info(f"Processing based on window size {self.window_size}.")
self.task_handler_selector()
self.batch = []
last_batch_time = current_time
except Exception as e:
logger.exception(f"Error in Dask streaming process: {e}")
finally:
logger.info(">>> Exiting Dask Streamer...")
self.cleanup()
logger.info(">>> Dask Streamer Cleanup Completed.")
def task_handler_selector(self):
"""Select the task handler based on the task type."""
if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]):
if self.client.status == 'running':
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
else:
logger.warning("Dask client is not running. Skipping processing.")
else:
logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
def produce_result(self, result, destination_topic):
"""Produce results to the Kafka topic."""
for record in result:
try:
self.producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=AnalyzerHelper.delivery_report
)
except KafkaException as e:
logger.error(f"Failed to produce message: {e}")
self.producer.flush()
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def cleanup(self):
"""Clean up Kafka and Dask resources."""
logger.info("Shutting down resources...")
self.running = False
if self.consumer:
try:
self.consumer.close()
logger.info("Kafka consumer closed.")
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if self.producer:
try:
self.producer.flush()
logger.info("Kafka producer flushed and closed.")
except Exception as e:
logger.error(f"Error closing Kafka producer: {e}")
if self.client and hasattr(self.client, 'status') and self.client.status == 'running':
try:
self.client.close()
logger.info("Dask client closed.")
except Exception as e:
logger.error(f"Error closing Dask client: {e}")
if self.cluster and hasattr(self.cluster, 'close'):
try:
self.cluster.close(timeout=5)
logger.info("Dask cluster closed.")
except Exception as e:
logger.error(f"May be timeout. Error closing Dask cluster: {e}")
......@@ -53,8 +53,8 @@ def create_analyzer():
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id)
......@@ -63,11 +63,14 @@ def create_analyzer():
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from analytics.backend.service.AnalyzerHandlers import AnalyzerHandlers
def get_input_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
def get_output_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4']
def get_thresholds():
return {
"task_type": AnalyzerHandlers.AGGREGATION_HANDLER.value,
"task_parameter": [
{"last": (40, 80), "variance": (300, 500)},
{"count": (2, 4), "max": (70, 100)},
{"min": (10, 20), "avg": (50, 70)},
],
}
def get_duration():
return 30
def get_windows_size():
return None
def get_batch_size():
return 10
def get_interval():
return 5
def get_batch():
return [
{"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72},
{"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22},
{"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24},
{"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67},
{"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6},
{"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9},
{"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44},
{"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76},
{"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71},
{"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44},
]
def get_agg_df():
data = [
{"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41},
]
return pd.DataFrame(data)
......@@ -12,148 +12,186 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time, json
from typing import Dict
import pytest
import logging
from threading import Event, Thread
import pandas as pd
from unittest.mock import MagicMock, patch
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
from analytics.backend.service.Streamer import DaskStreamer
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
logger.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
###########################
# Tests Implementation of Telemetry Backend
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
logger.info(f" >>> Starting test: {request.node.name} >>> ")
yield
logger.info(f" <<< Finished test: {request.node.name} <<< ")
@pytest.fixture
def dask_streamer():
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client') as mock_dask_client, \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer:
mock_dask_client.return_value = (MagicMock(), MagicMock())
mock_kafka_consumer.return_value = MagicMock()
mock_kafka_producer.return_value = MagicMock()
return DaskStreamer(
key="test_key",
input_kpis=get_input_kpi_list(),
output_kpis=get_output_kpi_list(),
thresholds=get_thresholds(),
batch_size=get_batch_size(),
window_size=get_windows_size(),
n_workers=3,
threads_per_worker=1
)
def test_initialization(dask_streamer):
"""Test if the DaskStreamer initializes correctly."""
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.window_size is None
assert dask_streamer.n_workers == 3
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
assert dask_streamer.cluster is not None
def test_run_stops_on_no_consumer(dask_streamer):
"""Test if the run method exits when the consumer is not initialized."""
dask_streamer.consumer = None
with patch('time.sleep', return_value=None):
dask_streamer.run()
assert not dask_streamer.running
def test_task_handler_selector_valid_handler(dask_streamer):
"""Test task handler selection with a valid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \
patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \
patch.object(dask_streamer.client, 'status', 'running'):
dask_streamer.task_handler_selector()
mock_submit.assert_called_once()
def test_task_handler_selector_invalid_handler(dask_streamer):
"""Test task handler selection with an invalid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False):
dask_streamer.task_handler_selector()
assert dask_streamer.batch == []
def test_produce_result(dask_streamer):
"""Test if produce_result sends records to Kafka."""
result = [{"kpi_id": "kpi1", "value": 100}]
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
patch.object(dask_streamer.producer, 'produce') as mock_produce:
dask_streamer.produce_result(result, "test_topic")
mock_produce.assert_called_once_with(
"test_topic",
key="kpi1",
value='{"kpi_id": "kpi1", "value": 100}',
callback=mock_delivery_report
)
def test_cleanup(dask_streamer):
"""Test the cleanup method."""
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.producer, 'flush') as mock_producer_flush, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch.object(dask_streamer.cluster, 'close', MagicMock()) as mock_cluster_close:
# Mock the conditions required for the close calls
dask_streamer.client.status = 'running'
dask_streamer.cluster.close = MagicMock()
dask_streamer.cleanup()
mock_consumer_close.assert_called_once()
mock_producer_flush.assert_called_once()
mock_client_close.assert_called_once()
dask_streamer.cluster.close.assert_called_once()
def test_run_with_valid_consumer(dask_streamer):
"""Test the run method with a valid Kafka consumer."""
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
# Simulate valid messages without errors
mock_message_1 = MagicMock()
mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
mock_message_1.error.return_value = None # No error
mock_message_2 = MagicMock()
mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
mock_message_2.error.return_value = None # No error
# Mock `poll` to return valid messages
mock_poll.side_effect = [mock_message_1, mock_message_2]
# Run the `run` method in a limited loop
with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays
dask_streamer.running = True # Ensure the streamer runs
dask_streamer.batch_size = 2 # Set a small batch size for the test
# Limit the loop by breaking it after one full processing cycle
def stop_running_after_task_handler(*args, **kwargs):
logger.info("Stopping the streamer after processing the first batch.")
dask_streamer.running = False
mock_task_handler_selector.side_effect = stop_running_after_task_handler
# Execute the method
dask_streamer.run()
# Assertions
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=2.0) # Poll should have been called
# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
# Create a sample batch
batch = get_batch()
input_kpi_list = get_input_kpi_list()
output_kpi_list = get_output_kpi_list()
thresholds = get_thresholds()
# Test aggregation_handler
aggregated_df = aggregation_handler(
"test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
)
assert isinstance(aggregated_df, list)
assert all(isinstance(item, dict) for item in aggregated_df)
# Test threshold_handler
def test_threshold_handler():
# Create a sample aggregated DataFrame
agg_df = get_agg_df()
thresholds = get_thresholds()
# --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# stop_event = Event()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# --- To test Start Streamer functionality ---
# def test_StartSparkStreamer():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# analyzer_obj = create_analyzer()
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# AnalyticsBackendServiceObj.stop_event = Event()
# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
# listener_thread.start()
# time.sleep(100)
# AnalyticsBackendServiceObj.stop_event.set()
# LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
# listener_thread.join(timeout=10)
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
# LOGGER.info('Completed test_RunRequestListener')
# To test START and STOP communication together
# def test_StopRequestListener():
# LOGGER.info('test_RunRequestListener')
# LOGGER.info('Initiating StartRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# # LOGGER.debug(str(response_thread))
# time.sleep(10)
# LOGGER.info('Initiating StopRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# To independently tests the SparkListener functionality
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.RunSparkStreamer(
# get_kpi_id_list(), get_operation_list(), get_threshold_dict()
# )
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# Test threshold_handler
result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
assert isinstance(result, pd.DataFrame)
assert result.shape == (1, 7)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment