diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 9be39db909d915b2a9b5d99b01841db028959543..c0e2dd877ba9f7a898367119386e49b442614a36 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -48,6 +48,7 @@ message KpiDescriptor { context.EndPointId endpoint_id = 6; context.ServiceId service_id = 7; context.SliceId slice_id = 8; + context.ConnectionId connection_id = 9; } message MonitorKpiRequest { diff --git a/src/monitoring/service/AlarmManager.py b/src/monitoring/service/AlarmManager.py index e5ac8915c3728c7894dc70ab901215dd5a7feb41..d80d815fe50395e360d0fc9e932168fa6e1a0174 100644 --- a/src/monitoring/service/AlarmManager.py +++ b/src/monitoring/service/AlarmManager.py @@ -1,3 +1,4 @@ +import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.jobstores.base import JobLookupError @@ -19,10 +20,16 @@ class AlarmManager(): end_date=None if subscription_timeout_s: start_timestamp=time.time() - start_date=datetime.fromtimestamp(start_timestamp) - end_date=datetime.fromtimestamp(start_timestamp+subscription_timeout_s) - self.scheduler.add_job(self.metrics_db.get_alarm_data, args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms),trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, end_date=end_date, id=alarm_id) + end_timestamp = start_timestamp + subscription_timeout_s + start_date = datetime.utcfromtimestamp(start_timestamp).isoformat() + end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() + + job = self.scheduler.add_job(self.metrics_db.get_alarm_data, + args=(alarm_queue,kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms), + trigger='interval', seconds=(subscription_frequency_ms/1000), start_date=start_date, + end_date=end_date,timezone=pytz.utc, id=str(alarm_id)) LOGGER.debug(f"Alarm job {alarm_id} succesfully created") + job.remove() def delete_alarm(self, alarm_id): try: diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index ece9bedb252ad589c27e2832ba1b2fbe4035e9a3..f9f2f4fda875d757f348e5102a3b767b43594795 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -40,7 +40,9 @@ class ManagementDB: kpi_sample_type INTEGER, device_id INTEGER, endpoint_id INTEGER, - service_id INTEGER + service_id INTEGER, + slice_id INTEGER, + connection_id INTEGER ); """ ) @@ -91,18 +93,20 @@ class ManagementDB: LOGGER.debug(f"Alarm table cannot be created in the ManagementDB. {e}") raise Exception - def insert_KPI(self, kpi_description, kpi_sample_type, device_id, endpoint_id, service_id): + def insert_KPI( + self, kpi_description, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id + ): try: c = self.client.cursor() c.execute( - "SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?", - (device_id, kpi_sample_type, endpoint_id, service_id), + "SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ? AND slice_id is ? AND connection_id is ?", + (device_id, kpi_sample_type, endpoint_id, service_id, slice_id, connection_id), ) data = c.fetchone() if data is None: c.execute( - "INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", - (kpi_description, kpi_sample_type, device_id, endpoint_id, service_id), + "INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id,slice_id,connection_id) VALUES (?,?,?,?,?,?,?)", + (kpi_description, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id), ) self.client.commit() kpi_id = c.lastrowid diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index 16e6373f542656b4c172c8d619bf3f17ca5df404..0f41cfee1a5461a2d70a61ec23093496bd43e1c4 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -87,6 +87,8 @@ class MetricsDB(): 'device_id SYMBOL,' \ 'endpoint_id SYMBOL,' \ 'service_id SYMBOL,' \ + 'slice_id SYMBOL,' \ + 'connection_id SYMBOL,' \ 'timestamp TIMESTAMP,' \ 'kpi_value DOUBLE)' \ 'TIMESTAMP(timestamp);' @@ -97,7 +99,7 @@ class MetricsDB(): LOGGER.debug(f"Table {self.table} cannot be created. {e}") raise Exception - def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, kpi_value): + def write_KPI(self, time, kpi_id, kpi_sample_type, device_id, endpoint_id, service_id, slice_id, connection_id, kpi_value): counter = 0 while (counter < self.retries): try: @@ -109,7 +111,9 @@ class MetricsDB(): 'kpi_sample_type': kpi_sample_type, 'device_id': device_id, 'endpoint_id': endpoint_id, - 'service_id': service_id}, + 'service_id': service_id, + 'slice_id': slice_id, + 'connection_id': connection_id,}, columns={ 'kpi_value': kpi_value}, at=datetime.datetime.fromtimestamp(time)) @@ -201,6 +205,8 @@ class MetricsDB(): kpi_list = self.run_query(query) if kpi_list: LOGGER.debug(f"New data received for alarm of KPI {kpi_id}") + LOGGER.info(kpi_list) + valid_kpi_list = [] for kpi in kpi_list: alarm = False kpi_value = kpi[2] @@ -263,10 +269,10 @@ class MetricsDB(): if (kpi_value >= kpiMaxValue): alarm = True if alarm: - # queue.append[kpi] - alarm_queue.put_nowait(kpi) - LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") - else: - LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") + valid_kpi_list.append(kpi) + alarm_queue.put_nowait(valid_kpi_list) + LOGGER.debug(f"Alarm of KPI {kpi_id} triggered -> kpi_value:{kpi[2]}, timestamp:{kpi[1]}") + else: + LOGGER.debug(f"No new data for the alarm of KPI {kpi_id}") except (Exception) as e: LOGGER.debug(f"Alarm data cannot be retrieved. {e}") \ No newline at end of file diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 7cd47f187986a0c32eea2ac8405183ac4418d100..9c88ed31124f97151a010af8185afa23bf548c47 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmLis KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from common.tools.timestamp.Converters import timestamp_string_to_float +from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -85,13 +85,16 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid + if request.kpi_id.kpi_id.uuid is not "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response @@ -131,11 +134,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): if kpi_db is None: LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') @@ -154,12 +159,14 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): for item in data: kpi_descriptor = KpiDescriptor() - kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) - kpi_descriptor.kpi_description = item[1] - kpi_descriptor.kpi_sample_type = item[2] - kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) - kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) - kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) @@ -186,11 +193,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid + sliceId = kpiDescriptor.slice_id.slice_uuid.uuid + connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, kpi_value) + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) return Empty() except ServiceException as e: @@ -250,9 +259,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: - subs_queue = Queue() - subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s @@ -268,18 +275,21 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp, end_timestamp) # parse queue to append kpis into the list - while not subs_queue.empty(): - list = subs_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) - - subs_response.subs_id.subs_id.uuid = str(subs_id) - - yield subs_response + while True: + while not subs_queue.empty(): + subs_response = SubsResponse() + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response + if timestamp_utcnow_to_float() > end_timestamp: + break + # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) @@ -424,6 +434,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() @@ -454,15 +465,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid - alarm = self.management_db.get_alarm(alarm_id) - alarm_response = AlarmResponse() - - if alarm: + alarm_data = self.management_db.get_alarm(alarm_id) + real_start_time = timestamp_utcnow_to_float() + if alarm_data: + LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() - alarm_data = self.management_db.get_alarm(alarm) - alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] @@ -473,24 +482,30 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s + end_timestamp = real_start_time + subscription_timeout_s + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) - while not alarm_queue.empty(): - list = alarm_queue.get_nowait() - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - - alarm_response.alarm_id.alarm_id.uuid = alarm_id - - yield alarm_response + while True: + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response + if timestamp_utcnow_to_float() > end_timestamp: + break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: @@ -527,7 +542,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() if kpi_db is None: - LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" return response else: diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index fe27d6ee365676b05175b762a106621121e3b897..6ff922c52dea10b0301ff5f765b045e125e42c05 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -42,14 +42,12 @@ class SubscriptionManager(): if end_timestamp: end_date = datetime.utcfromtimestamp(end_timestamp).isoformat() - LOGGER.debug(f"kpi_id: {kpi_id}") - LOGGER.debug(f"sampling_interval_s: {sampling_interval_s}") - LOGGER.debug(f"subscription_id: {subscription_id}") - LOGGER.debug(f"start_date: {start_date}") - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), + job = self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue,kpi_id, sampling_interval_s), trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, timezone=pytz.utc, id=str(subscription_id)) LOGGER.debug(f"Subscrition job {subscription_id} succesfully created") + job.remove() def delete_subscription(self, subscription_id): - self.scheduler.remove_job(subscription_id) \ No newline at end of file + self.scheduler.remove_job(subscription_id) + LOGGER.debug(f"Subscription job {subscription_id} succesfully deleted") \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 845153856c44cec0576bd6f11b045e3310558a97..228b1ce420de8defd3a00a03176b0bd3cbfb924c 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -29,7 +29,9 @@ def create_kpi_request(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_b(): @@ -38,7 +40,9 @@ def create_kpi_request_b(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): @@ -47,7 +51,9 @@ def create_kpi_request_c(): _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): @@ -77,12 +83,18 @@ def kpi_query(): def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + sampling_duration_s = 20 + sampling_interval_s = 3 + real_start_time = timestamp_utcnow_to_float() + start_timestamp = real_start_time + end_timestamp = start_timestamp + sampling_duration_s + _subs_descriptor.subs_id.subs_id.uuid = "" _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid - _subs_descriptor.sampling_duration_s = 10 - _subs_descriptor.sampling_interval_s = 2 - _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() - _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + _subs_descriptor.sampling_duration_s = sampling_duration_s + _subs_descriptor.sampling_interval_s = sampling_interval_s + _subs_descriptor.start_timestamp.timestamp = start_timestamp + _subs_descriptor.end_timestamp.timestamp = end_timestamp return _subs_descriptor @@ -91,14 +103,14 @@ def subs_id(): return _subs_id -def alarm_descriptor(): +def alarm_descriptor(kpi_id): _alarm_descriptor = monitoring_pb2.AlarmDescriptor() _alarm_descriptor.alarm_description = "Alarm Description" _alarm_descriptor.name = "Alarm Name" - _alarm_descriptor.kpi_id.kpi_id.uuid = "1" + _alarm_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0 - _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 50.0 + _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0 _alarm_descriptor.kpi_value_range.inRange = True _alarm_descriptor.kpi_value_range.includeMinValue = False _alarm_descriptor.kpi_value_range.includeMaxValue = True @@ -113,11 +125,16 @@ def alarm_descriptor_b(): return _alarm_descriptor def alarm_subscription(alarm_id): - _alarm_descriptor = monitoring_pb2.AlarmSubscription() + _alarm_subscription = monitoring_pb2.AlarmSubscription() - _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id) + subscription_timeout_s = 10 + subscription_frequency_ms = 1000 - return _alarm_descriptor + _alarm_subscription.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid) + _alarm_subscription.subscription_timeout_s = subscription_timeout_s + _alarm_subscription.subscription_frequency_ms = subscription_frequency_ms + + return _alarm_subscription def alarm_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ee6a29e8a483fe53c58a6e6d2e3aa240f2456b81..55ac9a18b2cd907781c9bbe12fa15ba622e95258 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -15,11 +15,14 @@ import copy, os, pytest import threading import time +from queue import Queue +from random import random from time import sleep from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import STATE_STOPPED from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum @@ -34,6 +37,7 @@ from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse +from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService @@ -43,6 +47,9 @@ from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from monitoring.service.AlarmManager import AlarmManager +from monitoring.service.MetricsDBTools import MetricsDB +from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position @@ -175,14 +182,23 @@ def subs_scheduler(): return _scheduler -def ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) +def ingestion_data(): + metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") for i in range(200): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) + kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') + kpiId = "3" + deviceId = 'DEV3' + endpointId = 'END3' + serviceId = 'SERV3' + sliceId = 'SLC3' + connectionId = 'CON3' + time_stamp = timestamp_utcnow_to_float() + kpi_value = 500*random() + + metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, + kpi_value) + sleep(0.1) ########################### # Tests Implementation @@ -274,31 +290,18 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na LOGGER.debug(str(response)) assert isinstance(response, KpiList) -def test_ingestion_data(monitoring_client): - _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - _include_kpi_request = include_kpi_request(_kpi_id) - - for i in range(100): - _include_kpi_request = include_kpi_request(_kpi_id) - monitoring_client.IncludeKpi(_include_kpi_request) - time.sleep(0.01) - -# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): -# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") - # Test case that makes use of client fixture to test server's SetKpiSubscription method -def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name +def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) - # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) - # thread.start() - monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + subs_scheduler.add_job(ingestion_data) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) - LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) + if (subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): @@ -331,7 +334,8 @@ def test_delete_subscription(monitoring_client): # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') - response = monitoring_client.SetKpiAlarm(alarm_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) @@ -345,28 +349,35 @@ def test_get_alarms(monitoring_client): # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmDescriptor(alarm_id) - LOGGER.debug(response) - assert isinstance(response, AlarmDescriptor) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.GetAlarmDescriptor(_alarm_id) + LOGGER.debug(_response) + assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method -def test_get_alarm_response_stream(monitoring_client): +def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) - assert isinstance(response, _MultiThreadedRendezvous) - for item in response: - LOGGER.debug(response) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + subs_scheduler.add_job(ingestion_data) + _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) + assert isinstance(_response, _MultiThreadedRendezvous) + for item in _response: + LOGGER.debug(item) assert isinstance(item,AlarmResponse) + if(subs_scheduler.state != STATE_STOPPED): + subs_scheduler.shutdown() + # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') - alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) - response = monitoring_client.DeleteAlarm(alarm_id) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) + _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) + _response = monitoring_client.DeleteAlarm(_alarm_id) + LOGGER.debug(type(_response)) + assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name @@ -384,9 +395,7 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) - # response = monitoring_client.GetInstantKpi(KpiId()) - # LOGGER.debug(type(response)) - # assert response.kpi_id.kpi_id.uuid == "NoID" + def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() @@ -395,8 +404,10 @@ def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) assert isinstance(response, int) def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name @@ -407,8 +418,10 @@ def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined- kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid - _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) @@ -426,22 +439,94 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid + kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, - kpi_service_id) + kpi_service_id, kpi_slice_id, kpi_connection_id) response = management_db.delete_KPI(_kpi_id) assert response -def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metric_sdb_tools_write_kpi begin') - - -def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') - - +def test_managementdb_tools_insert_alarm(management_db): + LOGGER.warning('test_managementdb_tools_insert_alarm begin') + + _alarm_description = "Alarm Description" + _alarm_name = "Alarm Name" + _kpi_id = "3" + _kpi_min_value = 0.0 + _kpi_max_value = 250.0 + _in_range = True + _include_min_value = False + _include_max_value = True + + _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, + _kpi_max_value, + _in_range, _include_min_value, _include_max_value) + LOGGER.debug(_alarm_id) + assert isinstance(_alarm_id,int) +# +# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_metric_sdb_tools_write_kpi begin') +# _kpiId = "6" +# +# for i in range(50): +# _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') +# _deviceId = 'DEV4' +# _endpointId = 'END4' +# _serviceId = 'SERV4' +# _sliceId = 'SLC4' +# _connectionId = 'CON4' +# _time_stamp = timestamp_utcnow_to_float() +# _kpi_value = 500*random() +# +# metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, +# _kpi_value) +# sleep(0.05) +# +# _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" +# _data = metrics_db.run_query(_query) +# assert len(_data) >= 50 +# +# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): +# LOGGER.warning('test_subscription_manager_create_subscription begin') +# subs_queue = Queue() +# +# subs_manager = SubscriptionManager(metrics_db) +# +# subs_scheduler.add_job(ingestion_data) +# +# kpi_id = "3" +# sampling_duration_s = 20 +# sampling_interval_s = 3 +# real_start_time = timestamp_utcnow_to_float() +# start_timestamp = real_start_time +# end_timestamp = start_timestamp + sampling_duration_s +# +# subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, +# sampling_interval_s,start_timestamp,end_timestamp) +# subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, +# sampling_duration_s,start_timestamp,end_timestamp) +# +# # This is here to simulate application activity (which keeps the main thread alive). +# total_points = 0 +# while True: +# while not subs_queue.empty(): +# list = subs_queue.get_nowait() +# kpi_list = KpiList() +# for item in list: +# kpi = Kpi() +# kpi.kpi_id.kpi_id.uuid = item[0] +# kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) +# kpi.kpi_value.floatVal = item[2] +# kpi_list.kpi.append(kpi) +# total_points += 1 +# LOGGER.debug(kpi_list) +# if timestamp_utcnow_to_float() > end_timestamp: +# break +# +# assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name