Skip to content
Snippets Groups Projects
Commit 38a64dc7 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Merge branch 'feat/196-cttc-new-monitoring-module-testing-and-debugging' into...

Merge branch 'feat/196-cttc-new-monitoring-module-testing-and-debugging' into feat/159-automation-component-skeleton
parents 1ea05ae7 5f72599e
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!238Automation component skeleton
......@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
......
......@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
......
......@@ -19,8 +19,9 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \
kpi_value_api/tests/test_kpi_value_api.py
......@@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Install Java (required for PySpark)
RUN apt-get update && \
apt-get install -y default-jdk && \
apt-get clean
# Set JAVA_HOME environment variable
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH=$JAVA_HOME/bin:$PATH
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/backend
WORKDIR /var/teraflow/analytics/backend
......
......@@ -12,5 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
pyspark==3.5.2
dask==2024.1.0
distributed==2024.1.0
pandas==2.2.3
confluent-kafka==2.3.*
......@@ -12,18 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from threading import Thread, Event
from .DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
......@@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def StartSparkStreamer(self, analyzer_uuid, analyzer):
kpi_list = analyzer['input_kpis']
oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
# print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
# kpi_list, oper_list, thresholds, window_size, window_slider))
# LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
# kpi_list, oper_list, thresholds, window_size, window_slider))
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
window_size, window_slider, None ))
self.running_threads[analyzer_uuid] = (thread, stop_event)
thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def StopRequestListener(self, threadInfo: tuple):
try:
thread, stop_event = threadInfo
stop_event.set()
thread.join()
print ("Terminating Analytics backend RequestListener")
LOGGER.info("Terminating Analytics backend RequestListener")
return True
except Exception as e:
print ("Failed to terminate analytics backend {:}".format(e))
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def install_servicers(self):
threading.Thread(target=self.RequestListener, args=()).start()
......@@ -97,7 +59,8 @@ class AnalyticsBackendService(GenericGrpcService):
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
print ("Consumer error: {:}".format(receive_msg.error()))
break
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
......@@ -106,14 +69,44 @@ class AnalyticsBackendService(GenericGrpcService):
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.TerminateAnalyzerBackend(analyzer_uuid)
self.StopDaskListener(analyzer_uuid)
else:
self.StartSparkStreamer(analyzer_uuid, analyzer)
self.StartDaskListener(analyzer_uuid, analyzer)
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def TerminateAnalyzerBackend(self, analyzer_uuid):
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
thresholds = analyzer[ 'thresholds' ]
window_size = analyzer[ 'window_size' ]
window_slider = analyzer[ 'window_slider']
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
print ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
try:
stop_event = Event()
thread = Thread(
target=DaskStreamer,
# args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
args=(analyzer_uuid, kpi_list, thresholds, stop_event),
kwargs={
"window_size" : window_size,
}
)
thread.start()
self.running_threads[analyzer_uuid] = (thread, stop_event)
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def StopDaskListener(self, analyzer_uuid):
if analyzer_uuid in self.running_threads:
try:
thread, stop_event = self.running_threads[analyzer_uuid]
......@@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService):
return False
else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index()
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
......@@ -16,7 +16,7 @@ import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
Analyzer )
Analyzer, AnalyzerId )
def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
......@@ -38,6 +38,13 @@ def get_threshold_dict():
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
}
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id
def create_analyzer():
_create_analyzer = Analyzer()
......@@ -70,4 +77,38 @@ def create_analyzer():
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
\ No newline at end of file
return _create_analyzer
def create_analyzer_dask():
_create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#}
'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()])
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
......@@ -19,7 +19,9 @@ import threading
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
......@@ -29,35 +31,108 @@ LOGGER = logging.getLogger(__name__)
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# stop_event = Event()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
def test_StartSparkStreamer():
LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
analyzer_obj = create_analyzer()
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"],
"window_slider" : analyzer_obj.parameters["window_slider"],
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
}
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
assert isinstance(response, bool)
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# --- To test Start Streamer functionality ---
# def test_StartSparkStreamer():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
# analyzer_obj = create_analyzer()
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality
def test_StartRequestListener():
LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService()
AnalyticsBackendServiceObj.stop_event = Event()
listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
listener_thread.start()
time.sleep(100)
# AnalyticsBackendServiceObj.stop_event.set()
# LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
# listener_thread.join(timeout=10)
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
LOGGER.info('Completed test_RunRequestListener')
# To test START and STOP communication together
# def test_StopRequestListener():
......
......@@ -49,11 +49,12 @@ def create_analyzer():
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10)
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
......
......@@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'group.id' : 'kpi-value-api-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
......@@ -152,9 +152,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
response.start_timestamp.timestamp = datetime.strptime(
value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.end_timestamp.timestamp = datetime.strptime(
value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.kpi_id.kpi_id.uuid = value['kpi_id']
for key, threshold in value.items():
if key not in ['kpi_id', 'window']:
......@@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
except Exception as e:
LOGGER.warning("Listener stopped. Error: {:}".format(e))
finally:
self.StopListener()
self.scheduler.shutdown()
def response_listener(self, filter_key=None):
"""
......@@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
break
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment