diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 757fee8ae8557907316eda75661c236458babd59..927eedf5175594577d72b34820dacbe6647fd596 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmLis KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from common.tools.timestamp.Converters import timestamp_string_to_float +from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -277,14 +277,17 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp, end_timestamp) # parse queue to append kpis into the list - while not subs_queue.empty(): - list = subs_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) + while True: + while not subs_queue.empty(): + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + if timestamp_utcnow_to_float() > end_timestamp: + break subs_response.subs_id.subs_id.uuid = str(subs_id) @@ -433,6 +436,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() @@ -463,15 +467,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid - alarm = self.management_db.get_alarm(alarm_id) + alarm_data = self.management_db.get_alarm(alarm_id) alarm_response = AlarmResponse() - if alarm: - + if alarm_data: + LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() - alarm_data = self.management_db.get_alarm(alarm) - alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 92e388ae8fe9a7d80a9df80de7d9601b7732ca0c..23e0867c1533b747f978ba087172fccffddc5300 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -83,12 +83,18 @@ def kpi_query(): def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + _subs_descriptor.subs_id.subs_id.uuid = "" _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid - _subs_descriptor.sampling_duration_s = 10 - _subs_descriptor.sampling_interval_s = 2 - _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() - _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + _subs_descriptor.sampling_duration_s = sampling_duration_s + _subs_descriptor.sampling_interval_s = sampling_interval_s + _subs_descriptor.start_timestamp.timestamp = start_timestamp + _subs_descriptor.end_timestamp.timestamp = end_timestamp return _subs_descriptor @@ -97,14 +103,14 @@ def subs_id(): return _subs_id -def alarm_descriptor(): +def alarm_descriptor(kpi_id): _alarm_descriptor = monitoring_pb2.AlarmDescriptor() _alarm_descriptor.alarm_description = "Alarm Description" _alarm_descriptor.name = "Alarm Name" - _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 - _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0 _alarm_descriptor.kpi_value_range.inRange = True _alarm_descriptor.kpi_value_range.includeMinValue = False _alarm_descriptor.kpi_value_range.includeMaxValue = True @@ -121,7 +127,7 @@ def alarm_descriptor_b(): def alarm_subscription(alarm_id): _alarm_descriptor = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) return _alarm_descriptor diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ba41511d2660f28dc927c37f12bc399e60f747fd..33e9d3224baa80263af88f9434eac6de0955b5da 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -15,6 +15,8 @@ import copy, os, pytest import threading import time +from queue import Queue +from random import random from time import sleep from typing import Tuple @@ -34,6 +36,7 @@ from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -43,6 +46,9 @@ from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position @@ -175,14 +181,23 @@ def subs_scheduler(): return _scheduler -def ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) +def ingestion_data(): + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") for i in range(200): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = "3" + deviceId = 'DEV3' + endpointId = 'END3' + serviceId = 'SERV3' + sliceId = 'SLC3' + connectionId = 'CON3' + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() + + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.05) ########################### # Tests Implementation @@ -274,28 +289,13 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na LOGGER.debug(str(response)) assert isinstance(response, KpiList) -def test_ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) - - for i in range(100): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) - -# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): -# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") - # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) - # thread.start() - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + subs_scheduler.add_job(ingestion_data) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) - LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) @@ -331,7 +331,8 @@ def test_delete_subscription(monitoring_client): # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') - response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) @@ -345,28 +346,31 @@ def test_get_alarms(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmDescriptor(alarm_id) - LOGGER.debug(response) - assert isinstance(response, AlarmDescriptor) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmDescriptor(_alarm_id) + LOGGER.debug(_response) + assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method def test_get_alarm_response_stream(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) - assert isinstance(response, _MultiThreadedRendezvous) - for item in response: - LOGGER.debug(response) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) + assert isinstance(_response, _MultiThreadedRendezvous) + for item in _response: + LOGGER.debug(_response) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.DeleteAlarm(alarm_id) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.DeleteAlarm(_alarm_id) + LOGGER.debug(type(_response)) + assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name @@ -440,14 +444,116 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin assert response +def test_managementdb_tools_insert_alarm(management_db): + LOGGER.warning('test_managementdb_tools_insert_alarm begin') + + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + LOGGER.debug(_alarm_id) + assert isinstance(_alarm_id,int) + def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') + for i in range(200): + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = "3" + deviceId = 'DEV3' + endpointId = 'END3' + serviceId = 'SERV3' + sliceId = 'SLC3' + connectionId = 'CON3' + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() + + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.05) def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') +def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): + LOGGER.warning('test_subscription_manager_create_subscription begin') + subs_queue = Queue() + + subs_manager = SubscriptionManager(metrics_db) + + subs_scheduler.add_job(ingestion_data) + + kpi_id = "3" + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + + subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, + sampling_interval_s,start_timestamp,end_timestamp) + subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, + sampling_duration_s,start_timestamp,end_timestamp) + + # This is here to simulate application activity (which keeps the main thread alive). + total_points = 0 + while True: + while not subs_queue.empty(): + list = subs_queue.get_nowait() + kpi_list = KpiList() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = item[0] + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] + kpi_list.kpi.append(kpi) + total_points += 1 + LOGGER.debug(kpi_list) + if timestamp_utcnow_to_float() > end_timestamp: + break + + assert total_points != 0 + +def test_alarm_manager_create_alarm(management_db,metrics_db): + LOGGER.warning('test_alarm_manager_create_alarm begin') + + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + _subscription_frequency_ms = 10 + + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + + LOGGER.debug(f"alarm_id: {_alarm_id}") + _alarm_queue = Queue() + _alarm_manager = AlarmManager(metrics_db) + + _alarm_manager.create_alarm(_alarm_queue,str(_alarm_id),_kpi_id,_kpi_min_value,_kpi_max_value,_in_range,_include_min_value,_include_max_value,_subscription_frequency_ms) + + LOGGER.debug(_alarm_queue) + + while not _alarm_queue.empty(): + list = _alarm_queue.get_nowait() + LOGGER.debug(list) + for item in list: + LOGGER.debug(item) + def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name