Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 804 additions and 473 deletions
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import pandas as pd
from dask.distributed import Client, LocalCluster
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def SettingKafkaConsumerParams():
return {'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-backend',
'auto.offset.reset' : 'latest'}
def GetAggregationMappings(thresholds):
agg_dict = {}
for threshold_key in thresholds.keys():
parts = threshold_key.split('_', 1)
if len(parts) != 2:
LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
continue
aggregation, metric_name = parts
# Ensure that the aggregation function is valid in pandas
if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
continue
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
on the aggregated DataFrame.
Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
Returns: pd.DataFrame: DataFrame with additional threshold columns.
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["value"] = aggregated_df[threshold_key]
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
def initialize_dask_client():
"""
Initialize a local Dask cluster and client.
"""
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
LOGGER.info(f"Dask Client Initialized: {client}")
return client, cluster
def initialize_kafka_producer():
return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
def delivery_report(err, msg):
if err is not None:
LOGGER.error(f"Message delivery failed: {err}")
else:
LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
def process_batch(batch, agg_mappings, thresholds, key):
"""
Process a batch of data and apply thresholds.
Args: batch (list of dict): List of messages from Kafka.
agg_mappings (dict): Mapping from threshold key to aggregation function.
thresholds (dict): Thresholds dictionary.
Returns: list of dict: Processed records ready to be sent to Kafka.
"""
if not batch:
LOGGER.info("Empty batch received. Skipping processing.")
return []
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
if not required_columns.issubset(df.columns):
LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
return []
if df.empty:
LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
return []
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
#example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
#given that threshold has 1 value
second_value_tuple = next(iter(agg_dict.values()))[1]
#in case we have multiple thresholds!
#second_values_tuples = [value[1] for value in agg_dict.values()]
if second_value_tuple=="min":
df_agg = df_agg_.min(numeric_only=True).to_frame().T
elif second_value_tuple == "max":
df_agg = df_agg_.max(numeric_only=True).to_frame().T
elif second_value_tuple == "std":
df_agg = df_agg_.sted(numeric_only=True).to_frame().T
else:
df_agg = df_agg_.mean(numeric_only=True).to_frame().T
# Assign the first value of window_start from the original aggregated data
df_agg['window_start'] = df_agg_['window_start'].iloc[0]
# Reorder columns to place 'window_start' first if needed
cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
df_agg = df_agg[cols]
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
for record in result:
try:
producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=delivery_report
)
except KafkaException as e:
LOGGER.error(f"Failed to produce message: {e}")
producer.flush()
LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def DaskStreamer(key, kpi_list, thresholds, stop_event,
window_size="30s", time_stamp_col="time_stamp"):
client, cluster = initialize_dask_client()
consumer_conf = SettingKafkaConsumerParams()
consumer = Consumer(consumer_conf)
consumer.subscribe([KafkaTopic.VALUE.value])
producer = initialize_kafka_producer()
# Parse window_size to seconds
try:
window_size_td = pd.to_timedelta(window_size)
window_size_seconds = window_size_td.total_seconds()
except Exception as e:
LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
window_size_seconds = 30
LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
# Extract aggregation mappings from thresholds
agg_mappings = GetAggregationMappings(thresholds)
if not agg_mappings:
LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
consumer.close()
producer.flush()
client.close()
cluster.close()
return
try:
batch = []
last_batch_time = time.time()
LOGGER.info("Starting to consume messages...")
while not stop_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
LOGGER.error(f"Kafka error: {msg.error()}")
continue
try:
message_value = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError as e:
LOGGER.error(f"JSON decode error: {e}")
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
continue
window_start = message_timestamp.floor(window_size)
LOGGER.warning(f"window_start: {window_start}. Skipping message.")
message_value['window_start'] = window_start
except Exception as e:
LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
continue
if message_value['kpi_id'] not in kpi_list:
LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
continue
batch.append(message_value)
current_time = time.time()
if (current_time - last_batch_time) >= window_size_seconds and batch:
LOGGER.info("Time-based batch threshold reached. Processing batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
batch = []
last_batch_time = current_time
except Exception as e:
LOGGER.exception(f"Error in Dask streaming process: {e}")
finally:
# Process any remaining messages in the batch
if batch:
LOGGER.info("Processing remaining messages in the batch.")
future = client.submit(process_batch, batch, agg_mappings, thresholds)
future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
consumer.close()
producer.flush()
LOGGER.info("Kafka consumer and producer closed.")
client.close()
cluster.close()
LOGGER.info("Dask client and cluster closed.")
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import json
import threading
import logging
from confluent_kafka import KafkaException, KafkaError
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds,
batch_size = 5,
batch_duration = None,
window_size = None,
cluster_instance = None,
producer_instance = AnalyzerHelper.initialize_kafka_producer()
):
super().__init__()
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size # TODO: Not implemented
self.batch_size = batch_size
self.batch_duration = batch_duration
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client = AnalyzerHelper.initialize_dask_client(cluster_instance)
self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer
self.producer = producer_instance
logger.info("Dask Streamer initialized.")
def run(self):
"""Main method to start the DaskStreamer."""
try:
logger.info("Starting Dask Streamer")
last_batch_time = time.time()
while True:
if not self.consumer:
logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
break
if not self.running:
logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
break
if not self.client:
logger.warning("Dask client is not running. Exiting loop.")
break
message = self.consumer.poll(timeout=1.0)
if message is None:
# logger.info("No new messages received.")
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
continue
elif message.error():
raise KafkaException(message.error())
else:
try:
value = json.loads(message.value())
except json.JSONDecodeError:
logger.error(f"Failed to decode message: {message.value()}")
continue
self.batch.append(value)
# Window size has a precedence over batch size
if self.batch_duration is None:
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
self.batch = []
else:
# Process based on window size
current_time = time.time()
if (current_time - last_batch_time) >= self.batch_duration and self.batch:
logger.info(f"Processing based on window size {self.batch_duration}.")
self.task_handler_selector()
self.batch = []
last_batch_time = current_time
except Exception as e:
logger.exception(f"Error in Dask streaming process: {e}")
finally:
self.stop()
logger.info(">>> Exiting Dask Streamer...")
def task_handler_selector(self):
"""Select the task handler based on the task type."""
logger.info(f"Batch to be processed: {self.batch}")
if Handlers.is_valid_handler(self.thresholds["task_type"]):
if self.client is not None and self.client.status == 'running':
try:
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
except Exception as e:
logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
else:
logger.warning("Dask client is not running. Skipping processing.")
else:
logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
def produce_result(self, result, destination_topic):
"""Produce results to the Kafka topic."""
if not result:
logger.warning("Nothing to produce. Skipping.")
return
for record in result:
try:
self.producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=AnalyzerHelper.delivery_report
)
except KafkaException as e:
logger.error(f"Failed to produce message: {e}")
self.producer.flush()
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
def stop(self):
"""Clean up Kafka and Dask thread resources."""
if not self.running:
logger.info("Dask Streamer is already stopped.")
return
self.running = False
logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
time.sleep(5) # Waiting time for running tasks to complete
if self.consumer:
try:
self.consumer.close()
logger.info("Kafka consumer closed.")
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
try:
self.client.close()
logger.info("Dask client closed.")
except Exception as e:
logger.error(f"Error closing Dask client: {e}")
# TODO: May be Single streamer for all analyzers ... ?
...@@ -53,8 +53,8 @@ def create_analyzer(): ...@@ -53,8 +53,8 @@ def create_analyzer():
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze # input IDs to analyze
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7" _kpi_id.kpi_id.uuid = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
...@@ -63,11 +63,14 @@ def create_analyzer(): ...@@ -63,11 +63,14 @@ def create_analyzer():
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis # output IDs after analysis
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter # parameter
_threshold_dict = { _threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from analytics.backend.service.AnalyzerHandlers import Handlers
def get_input_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
def get_output_kpi_list():
return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4']
def get_thresholds():
return {
"task_type": Handlers.AGGREGATION_HANDLER.value,
"task_parameter": [
{"last": [40, 80], "variance": [300, 500]},
{"count": [2, 4], "max": [70, 100]},
{"min": [10, 20], "avg": [50, 70]},
],
}
def get_duration():
return 90
def get_batch_duration():
return 30
def get_windows_size():
return None
def get_batch_size():
return 5
def get_interval():
return 5
def get_batch():
return [
{"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72},
{"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22},
{"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24},
{"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67},
{"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6},
{"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9},
{"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44},
{"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76},
{"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71},
{"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44},
]
def get_agg_df():
data = [
{"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41},
]
return pd.DataFrame(data)
...@@ -12,148 +12,307 @@ ...@@ -12,148 +12,307 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time, json import time
from typing import Dict import json
import pytest
import logging import logging
from threading import Event, Thread import pandas as pd
from common.tools.kafka.Variables import KafkaTopic
from unittest.mock import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df, get_duration
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer, create_analyzer_dask
from threading import Thread, Event
from ..service.DaskStreaming import DaskStreamer
LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# ----
# Test fixtures and helper functions
# ----
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
logger.info(f" >>>>> Starting test: {request.node.name} ")
yield
logger.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def mock_kafka_producer():
mock_producer = MagicMock()
mock_producer.produce = MagicMock()
mock_producer.flush = MagicMock()
return mock_producer
@pytest.fixture
def mock_dask_cluster():
mock_cluster = MagicMock()
mock_cluster.close = MagicMock()
return mock_cluster
@pytest.fixture
def mock_dask_client():
mock_client = MagicMock()
mock_client.status = 'running'
mock_client.submit = MagicMock()
return mock_client
@pytest.fixture()
def mock_kafka_consumer():
mock_consumer = MagicMock()
mock_consumer.subscribe = MagicMock()
return mock_consumer
@pytest.fixture()
def mock_streamer_start():
mock_streamer = MagicMock()
mock_streamer.start = MagicMock()
return mock_streamer
########################### ###########################
# Tests Implementation of Telemetry Backend # funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
########################### ###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests --- @pytest.fixture
def test_validate_kafka_topics(): def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
response = KafkaTopic.create_all_topics() patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
assert isinstance(response, bool) patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start):
# --- To test Dask Streamer functionality ---
# def test_StartDaskStreamer(): # Directly from the Streamer class service = AnalyticsBackendService()
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") yield service
# stop_event = Event() service.close()
# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
# oper_list = ['avg', 'min', 'max',]
# thresholds = {
# 'avg_value': (10.0, 90.0),
# 'min_value': (5.0, 95.0),
# 'max_value': (15.0, 85.0),
# 'latency' : (2.0, 10.0)
# }
# # Start the DaskStreamer in a separate thread
# streamer_thread = Thread(
# target=DaskStreamer,
# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
# kwargs={
# "window_size": "60s",
# "win_slide_duration": "30s",
# "time_stamp_col": "time_stamp"
# }
# )
# streamer_thread.start()
# try:
# while True:
# time.sleep(10)
# except KeyboardInterrupt:
# LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
# stop_event.set()
# streamer_thread.join()
# LOGGER.info("Streamer stopped gracefully.")
# --- To test Start Streamer functionality ---
# def test_StartDaskStreamer():
# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
# analyzer_obj = create_analyzer_dask()
# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
# analyzer_to_generate : Dict = {
# "algo_name" : analyzer_obj.algorithm_name,
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
# "oper_mode" : analyzer_obj.operation_mode,
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]),
# # "oper_list" : analyzer_obj.parameters["oper_list"],
# "window_size" : analyzer_obj.parameters["window_size"],
# "window_slider" : analyzer_obj.parameters["window_slider"],
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# time.sleep(100)
# LOGGER.info('Initiating StopRequestListener...')
# # AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# --- To test Start Streamer functionality --- @pytest.fixture
# def test_StartSparkStreamer(): def analyzer_data():
# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") return {
# analyzer_obj = create_analyzer() 'algo_name' : 'test_algorithm',
# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid 'oper_mode' : 'test_mode',
# analyzer_to_generate : Dict = { 'input_kpis' : get_input_kpi_list(),
# "algo_name" : analyzer_obj.algorithm_name, 'output_kpis' : get_output_kpi_list(),
# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], 'thresholds' : get_thresholds(),
# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], 'duration' : get_duration(),
# "oper_mode" : analyzer_obj.operation_mode, 'batch_size_min' : get_batch_size(),
# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), 'window_size' : get_windows_size(),
# "window_size" : analyzer_obj.parameters["window_size"], 'batch_duration_min' : get_duration(),
# "window_slider" : analyzer_obj.parameters["window_slider"], }
# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
# }
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
# assert isinstance(response, bool)
# --- To TEST StartRequestListenerFunctionality def test_start_streamer(analytics_service, analyzer_data):
# def test_StartRequestListener(): analyzer_uuid = "test-analyzer-uuid"
# LOGGER.info('test_RunRequestListener') # Start streamer
# AnalyticsBackendServiceObj = AnalyticsBackendService() result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
# AnalyticsBackendServiceObj.stop_event = Event() assert result is True
# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) assert analyzer_uuid in analytics_service.active_streamers
# listener_thread.start()
def test_stop_streamer(analytics_service, analyzer_data):
# time.sleep(100) analyzer_uuid = "test-analyzer-uuid"
# AnalyticsBackendServiceObj.stop_event.set() # Start streamer for stopping it later
# LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
# listener_thread.join(timeout=10) assert analyzer_uuid in analytics_service.active_streamers
# assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
# LOGGER.info('Completed test_RunRequestListener') # Stop streamer
with patch('time.sleep', return_value=None):
# To test START and STOP communication together result = analytics_service.StopStreamer(analyzer_uuid)
# def test_StopRequestListener(): assert result is True
# LOGGER.info('test_RunRequestListener') assert analyzer_uuid not in analytics_service.active_streamers
# LOGGER.info('Initiating StartRequestListener...')
# AnalyticsBackendServiceObj = AnalyticsBackendService() # Verify that the streamer was stopped
# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) assert analyzer_uuid not in analytics_service.active_streamers
# # LOGGER.debug(str(response_thread))
# time.sleep(10) def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
# LOGGER.info('Initiating StopRequestListener...') analytics_service.close()
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) mock_kafka_producer.flush.assert_called_once()
# LOGGER.debug(str(response)) mock_dask_cluster.close.assert_called_once()
# assert isinstance(response, bool)
###########################
# funtionality pytest with specific fixtures for streamer class sub methods
###########################
@pytest.fixture
def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
return DaskStreamer(
key = "test_key",
input_kpis = get_input_kpi_list(),
output_kpis = get_output_kpi_list(),
thresholds = get_thresholds(),
batch_size = get_batch_size(),
window_size = get_windows_size(),
cluster_instance = mock_dask_cluster(),
producer_instance = mock_kafka_producer(),
)
def test_dask_streamer_initialization(dask_streamer):
"""Test if the DaskStreamer initializes correctly."""
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.window_size is None
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
def test_run_stops_on_no_consumer(dask_streamer):
"""Test if the run method exits when the consumer is not initialized."""
dask_streamer.consumer = None
with patch('time.sleep', return_value=None):
dask_streamer.run()
# To independently tests the SparkListener functionality assert not dask_streamer.running
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener') def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
# AnalyticsBackendServiceObj = AnalyticsBackendService() """Test task handler selection with a valid handler."""
# response = AnalyticsBackendServiceObj.RunSparkStreamer( with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):
# get_kpi_id_list(), get_operation_list(), get_threshold_dict()
# ) dask_streamer.task_handler_selector()
# LOGGER.debug(str(response)) assert dask_streamer.client.status == 'running'
def test_task_handler_selector_invalid_handler(dask_streamer):
"""Test task handler selection with an invalid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
dask_streamer.task_handler_selector()
assert dask_streamer.batch == []
def test_produce_result(dask_streamer):
"""Test if produce_result sends records to Kafka."""
result = [{"kpi_id": "kpi1", "value": 100}]
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
patch.object(dask_streamer.producer, 'produce') as mock_produce:
dask_streamer.produce_result(result, "test_topic")
mock_produce.assert_called_once_with(
"test_topic",
key="kpi1",
value=json.dumps({"kpi_id": "kpi1", "value": 100}),
callback=mock_delivery_report
)
def test_stop(dask_streamer):
"""Test the cleanup method."""
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch('time.sleep', return_value=0):
# Mock the conditions required for the close calls
dask_streamer.client.status = 'running'
dask_streamer.stop()
mock_consumer_close.assert_called_once()
mock_client_close.assert_called_once()
def test_run_with_valid_consumer(dask_streamer):
"""Test the run method with a valid Kafka consumer."""
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
# Simulate valid messages without errors
mock_message_1 = MagicMock()
mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
mock_message_1.error.return_value = None # No error
mock_message_2 = MagicMock()
mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
mock_message_2.error.return_value = None # No error
# Mock `poll` to return valid messages
mock_poll.side_effect = [mock_message_1, mock_message_2]
# Run the `run` method in a limited loop
with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays
dask_streamer.running = True
dask_streamer.batch_size = 2
# Limit the loop by breaking it after one full processing cycle
def stop_running_after_task_handler():
logger.info("Stopping the streamer after processing the first batch.")
dask_streamer.running = False
mock_task_handler_selector.side_effect = stop_running_after_task_handler
dask_streamer.run()
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=1.0) # Poll should have been called at least once
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
# Create a sample batch
batch = get_batch()
input_kpi_list = get_input_kpi_list()
output_kpi_list = get_output_kpi_list()
thresholds = get_thresholds()
# Test aggregation_handler
aggregated_df = aggregation_handler(
"test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
)
assert isinstance(aggregated_df, list)
assert all(isinstance(item, dict) for item in aggregated_df)
# # Test threshold_handler
def test_threshold_handler():
# Create a sample aggregated DataFrame
agg_df = get_agg_df()
thresholds = get_thresholds()
# Test threshold_handler
result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
assert isinstance(result, pd.DataFrame)
assert result.shape == (1, 7)
###########################
# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
###########################
# This is a local machine test to check the integration of the backend service with the Streamer
# @pytest.fixture(scope='session')
# def analyticBackend_service():
# logger.info('Initializing AnalyticsBackendService...')
# _service = AnalyticsBackendService()
# _service.start()
# logger.info('Yielding AnalyticsBackendService...')
# yield _service
# logger.info('Terminating AnalyticsBackendService...')
# _service.stop()
# logger.info('Terminated AnalyticsBackendService...')
# # --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# logger.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool) # assert isinstance(response, bool)
# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService):
# # backendServiceObject = AnalyticsBackendService()
# # backendServiceObject.install_servicers()
# logger.info(" waiting for 2 minutes for the backend service before termination ... ")
# time.sleep(300)
# logger.info(" Initiating stop collector ... ")
# status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# analyticBackend_service.close()
# assert isinstance(status, bool)
# assert status == True
# logger.info(" Backend service terminated successfully ... ")
...@@ -46,7 +46,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -46,7 +46,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self, def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore request : Analyzer, context: grpc.ServicerContext # type: ignore
) -> AnalyzerId: # type: ignore ) -> AnalyzerId: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId() response = AnalyzerId()
...@@ -65,14 +65,18 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -65,14 +65,18 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
""" """
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = { analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name, "algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode, "oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), "duration" : analyzer_obj.duration_s,
"window_size" : analyzer_obj.parameters["window_size"], "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_slider" : analyzer_obj.parameters["window_slider"], "window_size" : analyzer_obj.parameters["window_size"], # slider window size in seconds (single batch execution time)
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"] "window_slider" : analyzer_obj.parameters["window_slider"], # slider shift in seconds
"batch_size_min" : analyzer_obj.batch_min_size, # currently implemented
"batch_size_max" : analyzer_obj.batch_max_size,
"batch_duration_min" : analyzer_obj.batch_min_duration_s, # currently implemented
"batch_interval_max" : analyzer_obj.batch_max_duration_s
} }
self.kafka_producer.produce( self.kafka_producer.produce(
KafkaTopic.ANALYTICS_REQUEST.value, KafkaTopic.ANALYTICS_REQUEST.value,
...@@ -137,7 +141,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -137,7 +141,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self, def StopAnalyzer(self,
request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore request : AnalyzerId, context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore ) -> Empty: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
try: try:
...@@ -181,7 +185,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -181,7 +185,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self, def SelectAnalyzers(self,
filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore filter : AnalyzerFilter, context: grpc.ServicerContext # type: ignore
) -> AnalyzerList: # type: ignore ) -> AnalyzerList: # type: ignore
LOGGER.info("At Service gRPC message: {:}".format(filter)) LOGGER.info("At Service gRPC message: {:}".format(filter))
response = AnalyzerList() response = AnalyzerList()
...@@ -202,7 +206,5 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -202,7 +206,5 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.debug('Message delivery failed: {:}'.format(err))
# print ('Message delivery failed: {:}'.format(err))
else: else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
...@@ -20,43 +20,77 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze ...@@ -20,43 +20,77 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
def create_analyzer_id(): def create_analyzer_id():
_create_analyzer_id = AnalyzerId() _create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" # _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id return _create_analyzer_id
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_new_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze # input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id = KpiId()
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis # output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id = KpiId()
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf181818"
_create_analyzer.output_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf441616"
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter # parameter
# _threshold_dict = {
# 'mean_value' :[20, 30], 'min_value' :[00, 10], 'max_value' :[45, 50],
# 'first_value' :[00, 10], 'last_value' :[40, 50], 'std_value' :[00, 10]
# }
_threshold_dict = { _threshold_dict = {
'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), "task_type": Handlers.AGGREGATION_HANDLER.value,
'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) "task_parameter": [
} {"last": [40, 80], "variance": [300, 500]},
{"count": [2, 4], "max": [70, 100]},
{"min": [10, 20], "avg": [50, 70]},
],
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" _create_analyzer.parameters['window_size'] = "0" # slider window size in seconds (Total time for aggeration processing)
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size _create_analyzer.parameters['window_slider'] = "0" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
# duration of the analyzer
_create_analyzer.duration_s = 90
# batch window size
_create_analyzer.batch_min_duration_s = 20
_create_analyzer.batch_max_duration_s = 50
# batch size
_create_analyzer.batch_min_size = 5
_create_analyzer.batch_max_size = 10
return _create_analyzer return _create_analyzer
def create_analyzer_filter(): def create_analyzer_filter():
...@@ -84,3 +118,10 @@ def create_analyzer_filter(): ...@@ -84,3 +118,10 @@ def create_analyzer_filter():
# _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj) # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj)
return _create_analyzer_filter return _create_analyzer_filter
# Added for testing to remove the dependency on the backend service
from enum import Enum
class Handlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"
...@@ -78,6 +78,15 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ...@@ -78,6 +78,15 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
LOGGER.info('Closed AnalyticsFrontendClient...') LOGGER.info('Closed AnalyticsFrontendClient...')
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
yield
LOGGER.info(f" <<<<< Finished test: {request.node.name} ")
########################### ###########################
# Tests Implementation of Analytics Frontend # Tests Implementation of Analytics Frontend
...@@ -89,24 +98,17 @@ def test_validate_kafka_topics(): ...@@ -89,24 +98,17 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together # To test start and stop listener together
def test_StartAnalyzers(analyticsFrontend_client): def test_StartAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalyzers START: <<< ') LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id)) LOGGER.debug(str(added_analyzer_id))
LOGGER.info(' --> Calling StartResponseListener... ') # LOGGER.info(' --> Calling StartResponseListener... ')
class_obj = AnalyticsFrontendServiceServicerImpl() # class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid) # response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid)
LOGGER.debug(response) # LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...") LOGGER.info("waiting for timer to complete ...")
time.sleep(3) time.sleep(15)
LOGGER.info('--> StopAnalyzer') LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response)) LOGGER.debug(str(response))
......
...@@ -38,6 +38,7 @@ class DeviceTypeEnum(Enum): ...@@ -38,6 +38,7 @@ class DeviceTypeEnum(Enum):
CLIENT = 'client' CLIENT = 'client'
DATACENTER = 'datacenter' DATACENTER = 'datacenter'
IP_SDN_CONTROLLER = 'ip-sdn-controller' IP_SDN_CONTROLLER = 'ip-sdn-controller'
NCE = 'nce'
MICROWAVE_RADIO_SYSTEM = 'microwave-radio-system' MICROWAVE_RADIO_SYSTEM = 'microwave-radio-system'
OPEN_LINE_SYSTEM = 'open-line-system' OPEN_LINE_SYSTEM = 'open-line-system'
OPTICAL_ROADM = 'optical-roadm' OPTICAL_ROADM = 'optical-roadm'
...@@ -52,3 +53,4 @@ class DeviceTypeEnum(Enum): ...@@ -52,3 +53,4 @@ class DeviceTypeEnum(Enum):
# ETSI TeraFlowSDN controller # ETSI TeraFlowSDN controller
TERAFLOWSDN_CONTROLLER = 'teraflowsdn' TERAFLOWSDN_CONTROLLER = 'teraflowsdn'
IETF_SLICE = 'ietf-slice'
...@@ -51,24 +51,32 @@ LOGGER = logging.getLogger(__name__) ...@@ -51,24 +51,32 @@ LOGGER = logging.getLogger(__name__)
def delay_linear(initial=0, increment=0, maximum=None): def delay_linear(initial=0, increment=0, maximum=None):
def compute(num_try): def compute(num_try):
delay = initial + (num_try - 1) * increment delay = initial + (num_try - 1) * increment
if maximum is not None: delay = max(delay, maximum) if maximum is not None:
delay = max(delay, maximum)
return delay return delay
return compute return compute
def delay_exponential(initial=1, increment=1, maximum=None): def delay_exponential(initial=1, increment=1, maximum=None):
def compute(num_try): def compute(num_try):
delay = initial * pow(increment, (num_try - 1)) delay = initial * pow(increment, (num_try - 1))
if maximum is not None: delay = max(delay, maximum) if maximum is not None:
delay = max(delay, maximum)
return delay return delay
return compute return compute
def retry(max_retries=0, delay_function=delay_linear(initial=0, increment=0), # pylint: disable=dangerous-default-value
prepare_method_name=None, prepare_method_args=[], prepare_method_kwargs={}): def retry(
max_retries=0, delay_function=delay_linear(initial=0, increment=0),
prepare_method_name=None, prepare_method_args=list(), prepare_method_kwargs=dict()
):
def _reconnect(func): def _reconnect(func):
def wrapper(self, *args, **kwargs): def wrapper(self, *args, **kwargs):
if prepare_method_name is not None: if prepare_method_name is not None:
prepare_method = getattr(self, prepare_method_name, None) prepare_method = getattr(self, prepare_method_name, None)
if prepare_method is None: raise Exception('Prepare Method ({}) not found'.format(prepare_method_name)) if prepare_method is None:
MSG = 'Prepare Method ({:s}) not found'
# pylint: disable=broad-exception-raised
raise Exception(MSG.format(prepare_method_name))
num_try, given_up = 0, False num_try, given_up = 0, False
while not given_up: while not given_up:
try: try:
...@@ -78,14 +86,29 @@ def retry(max_retries=0, delay_function=delay_linear(initial=0, increment=0), ...@@ -78,14 +86,29 @@ def retry(max_retries=0, delay_function=delay_linear(initial=0, increment=0),
num_try += 1 num_try += 1
given_up = num_try > max_retries given_up = num_try > max_retries
if given_up: raise Exception('Giving up... {:d} tries failed'.format(max_retries)) from e if given_up:
MSG = '[{:s}:{:s}] Giving up... {:d} tries failed'
msg = MSG.format(func.__module__, func.__name__, max_retries)
# pylint: disable=broad-exception-raised
raise Exception(msg) from e
if delay_function is not None: if delay_function is not None:
delay = delay_function(num_try) delay = delay_function(num_try)
time.sleep(delay) time.sleep(delay)
LOGGER.info('Retry {:d}/{:d} after {:f} seconds...'.format(num_try, max_retries, delay)) MSG = '[{:s}:{:s}] Retry {:d}/{:d} after {:f} seconds...'
LOGGER.info(MSG.format(
func.__module__, func.__name__, num_try, max_retries, delay
))
else: else:
LOGGER.info('Retry {:d}/{:d} immediate...'.format(num_try, max_retries)) MSG = '[{:s}:{:s}] Retry {:d}/{:d} immediate...'
LOGGER.info(MSG.format(
func.__module__, func.__name__, num_try, max_retries
))
if prepare_method_name is not None: prepare_method(*prepare_method_args, **prepare_method_kwargs) if prepare_method_name is not None:
MSG = '[{:s}:{:s}] Running prepare method...'
LOGGER.debug(MSG.format(
prepare_method.__module__, prepare_method.__name__
))
prepare_method(*prepare_method_args, **prepare_method_kwargs)
return wrapper return wrapper
return _reconnect return _reconnect
...@@ -12,12 +12,20 @@ ...@@ -12,12 +12,20 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging import logging
from typing import Optional import grpc
from typing import Optional, Tuple, Union
from uuid import UUID, uuid5
from common.Constants import DEFAULT_CONTEXT_NAME from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Slice, SliceFilter, SliceId from common.method_wrappers.ServiceExceptions import InvalidArgumentsException
from common.proto.context_pb2 import ContextId, Slice, SliceFilter, SliceId
from common.method_wrappers.ServiceExceptions import InvalidArgumentsException
from common.proto.context_pb2 import ContextId, Slice, SliceFilter, SliceId
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
NAMESPACE_TFS = UUID("200e3a1f-2223-534f-a100-758e29c37f40")
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
def get_slice_by_id( def get_slice_by_id(
...@@ -59,3 +67,96 @@ def get_slice_by_uuid( ...@@ -59,3 +67,96 @@ def get_slice_by_uuid(
context_client, slice_id, rw_copy=rw_copy, include_endpoint_ids=include_endpoint_ids, context_client, slice_id, rw_copy=rw_copy, include_endpoint_ids=include_endpoint_ids,
include_constraints=include_constraints, include_service_ids=include_service_ids, include_constraints=include_constraints, include_service_ids=include_service_ids,
include_subslice_ids=include_subslice_ids, include_config_rules=include_config_rules) include_subslice_ids=include_subslice_ids, include_config_rules=include_config_rules)
def get_uuid_from_string(
str_uuid_or_name: Union[str, UUID], prefix_for_name: Optional[str] = None
) -> str:
# if UUID given, assume it is already a valid UUID
if isinstance(str_uuid_or_name, UUID):
return str_uuid_or_name
if not isinstance(str_uuid_or_name, str):
MSG = "Parameter({:s}) cannot be used to produce a UUID"
raise Exception(MSG.format(str(repr(str_uuid_or_name))))
try:
# try to parse as UUID
return str(UUID(str_uuid_or_name))
except: # pylint: disable=bare-except
# produce a UUID within TFS namespace from parameter
if prefix_for_name is not None:
str_uuid_or_name = "{:s}/{:s}".format(prefix_for_name, str_uuid_or_name)
return str(uuid5(NAMESPACE_TFS, str_uuid_or_name))
def context_get_uuid(
context_id: ContextId,
context_name: str = "",
allow_random: bool = False,
allow_default: bool = False,
) -> str:
context_uuid = context_id.context_uuid.uuid
if len(context_uuid) > 0:
return get_uuid_from_string(context_uuid)
if len(context_name) > 0:
return get_uuid_from_string(context_name)
if allow_default:
return get_uuid_from_string(DEFAULT_CONTEXT_NAME)
raise InvalidArgumentsException(
[
("context_id.context_uuid.uuid", context_uuid),
("name", context_name),
],
extra_details=["At least one is required to produce a Context UUID"],
)
def slice_get_uuid(slice_id: SliceId) -> Tuple[str, str]:
context_uuid = context_get_uuid(slice_id.context_id, allow_random=False)
raw_slice_uuid = slice_id.slice_uuid.uuid
if len(raw_slice_uuid) > 0:
return context_uuid, get_uuid_from_string(
raw_slice_uuid, prefix_for_name=context_uuid
)
raise InvalidArgumentsException(
[
("slice_id.slice_uuid.uuid", raw_slice_uuid),
],
extra_details=["At least one is required to produce a Slice UUID"],
)
def get_slice_by_defualt_id(
context_client : ContextClient, default_slice_id : SliceId, context_uuid : str = DEFAULT_CONTEXT_NAME,
rw_copy : bool = False, include_endpoint_ids : bool = True, include_constraints : bool = True,
include_service_ids : bool = True, include_subslice_ids : bool = True, include_config_rules : bool = True
) -> Optional[Slice]:
context_uuid, slice_uuid = slice_get_uuid(default_slice_id)
LOGGER.debug(f'P60: {context_uuid} {slice_uuid}')
slice_id = SliceId()
slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member
return get_slice_by_id(
context_client, slice_id, rw_copy=rw_copy, include_endpoint_ids=include_endpoint_ids,
include_constraints=include_constraints, include_service_ids=include_service_ids,
include_subslice_ids=include_subslice_ids, include_config_rules=include_config_rules)
def get_slice_by_defualt_name(
context_client : ContextClient, slice_name : str, context_uuid : str = DEFAULT_CONTEXT_NAME,
rw_copy : bool = False, include_endpoint_ids : bool = True, include_constraints : bool = True,
include_service_ids : bool = True, include_subslice_ids : bool = True, include_config_rules : bool = True
) -> Optional[Slice]:
default_slice_id = SliceId()
default_slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
default_slice_id.slice_uuid.uuid = slice_name # pylint: disable=no-member
context_uuid, slice_uuid = slice_get_uuid(default_slice_id)
slice_id = SliceId()
slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member
return get_slice_by_id(
context_client, slice_id, rw_copy=rw_copy, include_endpoint_ids=include_endpoint_ids,
include_constraints=include_constraints, include_service_ids=include_service_ids,
include_subslice_ids=include_subslice_ids, include_config_rules=include_config_rules)
...@@ -115,6 +115,8 @@ CONTROLLER_DEVICE_TYPES = { ...@@ -115,6 +115,8 @@ CONTROLLER_DEVICE_TYPES = {
DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value, DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.OPEN_LINE_SYSTEM.value, DeviceTypeEnum.OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value, DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value,
DeviceTypeEnum.IETF_SLICE.value,
DeviceTypeEnum.NCE.value,
} }
def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Dict], List[Dict]]: def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Dict], List[Dict]]:
......
...@@ -54,14 +54,13 @@ def update_config_rule_custom( ...@@ -54,14 +54,13 @@ def update_config_rule_custom(
config_rule.custom.resource_value = json.dumps(json_resource_value, sort_keys=True) config_rule.custom.resource_value = json.dumps(json_resource_value, sort_keys=True)
def copy_config_rules(source_config_rules, target_config_rules): def copy_config_rules(source_config_rules, target_config_rules, raise_if_differs = True):
for source_config_rule in source_config_rules: for source_config_rule in source_config_rules:
config_rule_kind = source_config_rule.WhichOneof('config_rule') config_rule_kind = source_config_rule.WhichOneof('config_rule')
if config_rule_kind == 'custom': if config_rule_kind == 'custom':
custom = source_config_rule.custom custom = source_config_rule.custom
resource_key = custom.resource_key resource_key = custom.resource_key
resource_value = json.loads(custom.resource_value) resource_value = json.loads(custom.resource_value)
raise_if_differs = True
fields = {name:(value, raise_if_differs) for name,value in resource_value.items()} fields = {name:(value, raise_if_differs) for name,value in resource_value.items()}
update_config_rule_custom(target_config_rules, resource_key, fields) update_config_rule_custom(target_config_rules, resource_key, fields)
......
...@@ -41,14 +41,14 @@ class KafkaConfig(Enum): ...@@ -41,14 +41,14 @@ class KafkaConfig(Enum):
class KafkaTopic(Enum): class KafkaTopic(Enum):
# TODO: Later to be populated from ENV variable. # TODO: Later to be populated from ENV variable.
REQUEST = 'topic_request' TELEMETRY_REQUEST = 'topic_telemetry_request'
RESPONSE = 'topic_response' TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw' RAW = 'topic_raw'
LABELED = 'topic_labeled' LABELED = 'topic_labeled'
VALUE = 'topic_value' VALUE = 'topic_value'
ALARMS = 'topic_alarms' ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_request_analytics' ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_response_analytics' ANALYTICS_RESPONSE = 'topic_analytics_response'
@staticmethod @staticmethod
def create_all_topics() -> bool: def create_all_topics() -> bool:
......
...@@ -12,18 +12,21 @@ ...@@ -12,18 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Optional, Union
import grpc, logging import grpc, logging
from concurrent import futures from concurrent import futures
from typing import Any, List, Optional, Union
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from grpc_reflection.v1alpha import reflection
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcService: class GenericGrpcService:
def __init__( def __init__(
self, bind_port : Union[str, int], bind_address : Optional[str] = None, max_workers : Optional[int] = None, self, bind_port : Union[str, int], bind_address : Optional[str] = None,
grace_period : Optional[int] = None, enable_health_servicer : bool = True, cls_name : str = __name__ max_workers : Optional[int] = None, grace_period : Optional[int] = None,
enable_health_servicer : bool = True, enable_reflection : bool = True,
cls_name : str = __name__
) -> None: ) -> None:
self.logger = logging.getLogger(cls_name) self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port self.bind_port = bind_port
...@@ -31,6 +34,8 @@ class GenericGrpcService: ...@@ -31,6 +34,8 @@ class GenericGrpcService:
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer self.enable_health_servicer = enable_health_servicer
self.enable_reflection = enable_reflection
self.reflection_service_names : List[str] = [reflection.SERVICE_NAME]
self.endpoint = None self.endpoint = None
self.health_servicer = None self.health_servicer = None
self.pool = None self.pool = None
...@@ -39,6 +44,11 @@ class GenericGrpcService: ...@@ -39,6 +44,11 @@ class GenericGrpcService:
def install_servicers(self): def install_servicers(self):
pass pass
def add_reflection_service_name(self, service_descriptor : Any, service_name : str):
self.reflection_service_names.append(
service_descriptor.services_by_name[service_name].full_name
)
def start(self): def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
...@@ -54,6 +64,9 @@ class GenericGrpcService: ...@@ -54,6 +64,9 @@ class GenericGrpcService:
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server) add_HealthServicer_to_server(self.health_servicer, self.server)
if self.enable_reflection:
reflection.enable_server_reflection(self.reflection_service_names, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint) self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
......
...@@ -12,19 +12,21 @@ ...@@ -12,19 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Optional, Union import grpc, logging
import grpc
import logging
from concurrent import futures from concurrent import futures
from typing import Any, List, Optional, Union
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from grpc_reflection.v1alpha import reflection
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcServiceAsync: class GenericGrpcServiceAsync:
def __init__( def __init__(
self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None, self, bind_port : Union[str, int], bind_address : Optional[str] = None,
grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__ max_workers : Optional[int] = None, grace_period : Optional[int] = None,
enable_health_servicer : bool = True, enable_reflection : bool = True,
cls_name : str = __name__
) -> None: ) -> None:
self.logger = logging.getLogger(cls_name) self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port self.bind_port = bind_port
...@@ -32,6 +34,8 @@ class GenericGrpcServiceAsync: ...@@ -32,6 +34,8 @@ class GenericGrpcServiceAsync:
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer self.enable_health_servicer = enable_health_servicer
self.enable_reflection = enable_reflection
self.reflection_service_names : List[str] = [reflection.SERVICE_NAME]
self.endpoint = None self.endpoint = None
self.health_servicer = None self.health_servicer = None
self.pool = None self.pool = None
...@@ -40,7 +44,12 @@ class GenericGrpcServiceAsync: ...@@ -40,7 +44,12 @@ class GenericGrpcServiceAsync:
async def install_servicers(self): async def install_servicers(self):
pass pass
async def start(self): def add_reflection_service_name(self, service_descriptor : Any, service_name : str):
self.reflection_service_names.append(
service_descriptor.services_by_name[service_name].full_name
)
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers))) str(self.endpoint), str(self.max_workers)))
...@@ -55,6 +64,9 @@ class GenericGrpcServiceAsync: ...@@ -55,6 +64,9 @@ class GenericGrpcServiceAsync:
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server) add_HealthServicer_to_server(self.health_servicer, self.server)
if self.enable_reflection:
reflection.enable_server_reflection(self.reflection_service_names, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint) self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
......
...@@ -16,7 +16,9 @@ import logging, sqlalchemy ...@@ -16,7 +16,9 @@ import logging, sqlalchemy
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import DESCRIPTOR as CONTEXT_DESCRIPTOR
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.context_policy_pb2 import DESCRIPTOR as CONTEXT_POLICY_DESCRIPTOR
from common.proto.context_policy_pb2_grpc import add_ContextPolicyServiceServicer_to_server from common.proto.context_policy_pb2_grpc import add_ContextPolicyServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
from .ContextServiceServicerImpl import ContextServiceServicerImpl from .ContextServiceServicerImpl import ContextServiceServicerImpl
...@@ -36,3 +38,6 @@ class ContextService(GenericGrpcService): ...@@ -36,3 +38,6 @@ class ContextService(GenericGrpcService):
def install_servicers(self): def install_servicers(self):
add_ContextServiceServicer_to_server(self.context_servicer, self.server) add_ContextServiceServicer_to_server(self.context_servicer, self.server)
add_ContextPolicyServiceServicer_to_server(self.context_servicer, self.server) add_ContextPolicyServiceServicer_to_server(self.context_servicer, self.server)
self.add_reflection_service_name(CONTEXT_DESCRIPTOR, 'ContextService')
self.add_reflection_service_name(CONTEXT_POLICY_DESCRIPTOR, 'ContextPolicyService')
...@@ -74,7 +74,7 @@ def connection_set(db_engine : Engine, messagebroker : MessageBroker, request : ...@@ -74,7 +74,7 @@ def connection_set(db_engine : Engine, messagebroker : MessageBroker, request :
_,service_uuid = service_get_uuid(request.service_id, allow_random=False) _,service_uuid = service_get_uuid(request.service_id, allow_random=False)
settings = grpc_message_to_json_string(request.settings), settings = grpc_message_to_json_string(request.settings),
now = datetime.datetime.utcnow() now = datetime.datetime.now(datetime.timezone.utc)
connection_data = [{ connection_data = [{
'connection_uuid': connection_uuid, 'connection_uuid': connection_uuid,
......
...@@ -82,7 +82,8 @@ def context_set(db_engine : Engine, messagebroker : MessageBroker, request : Con ...@@ -82,7 +82,8 @@ def context_set(db_engine : Engine, messagebroker : MessageBroker, request : Con
if len(request.slice_ids) > 0: # pragma: no cover if len(request.slice_ids) > 0: # pragma: no cover
LOGGER.warning('Items in field "slice_ids" ignored. This field is used for retrieval purposes only.') LOGGER.warning('Items in field "slice_ids" ignored. This field is used for retrieval purposes only.')
now = datetime.datetime.utcnow() now = datetime.datetime.now(datetime.timezone.utc)
context_data = [{ context_data = [{
'context_uuid': context_uuid, 'context_uuid': context_uuid,
'context_name': context_name, 'context_name': context_name,
......
...@@ -92,7 +92,7 @@ def device_set(db_engine : Engine, messagebroker : MessageBroker, request : Devi ...@@ -92,7 +92,7 @@ def device_set(db_engine : Engine, messagebroker : MessageBroker, request : Devi
oper_status = grpc_to_enum__device_operational_status(request.device_operational_status) oper_status = grpc_to_enum__device_operational_status(request.device_operational_status)
device_drivers = [grpc_to_enum__device_driver(d) for d in request.device_drivers] device_drivers = [grpc_to_enum__device_driver(d) for d in request.device_drivers]
now = datetime.datetime.utcnow() now = datetime.datetime.now(datetime.timezone.utc)
topology_uuids : Set[str] = set() topology_uuids : Set[str] = set()
related_topologies : List[Dict] = list() related_topologies : List[Dict] = list()
......