From ca74ba7d1144f590b5a09a50b71b4627b46cc420 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Fri, 14 Apr 2023 16:40:34 +0000 Subject: [PATCH] Monitoring Component: - Updated to new metrics export framework --- .../service/MonitoringServiceServicerImpl.py | 854 +++++++----------- src/monitoring/tests/test_unitary.py | 1 - 2 files changed, 340 insertions(+), 515 deletions(-) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 62adcf465..3bfef65ff 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os, grpc +import logging, os, grpc from queue import Queue - from typing import Iterator - -from common.logger import getJSONLogger +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.context_pb2 import Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType @@ -25,30 +23,22 @@ from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList -from common.method_wrappers.ServiceExceptions import ServiceException from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float - -from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient - -from prometheus_client import Counter, Summary - +from monitoring.service import ManagementDBTools, MetricsDBTools from monitoring.service.AlarmManager import AlarmManager from monitoring.service.NameMapping import NameMapping from monitoring.service.SubscriptionManager import SubscriptionManager -LOGGER = getJSONLogger('monitoringservice-server') -LOGGER.setLevel('DEBUG') - -MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( - 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') -MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') +LOGGER = logging.getLogger(__name__) METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE_MONITORING_KPIS = os.environ.get("METRICSDB_TABLE_MONITORING_KPIS") +METRICS_POOL = MetricsPool('Monitoring', 'RPC') + class MonitoringServiceServicerImpl(MonitoringServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init monitoringService') @@ -63,514 +53,363 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetKpi( self, request: KpiDescriptor, grpc_context: grpc.ServicerContext ) -> KpiId: - # CREATEKPI_COUNTER_STARTED.inc() - LOGGER.info('SetKpi') - try: - # Here the code to create a sqlite query to crete a KPI and return a KpiID - response = KpiId() - - kpi_description = request.kpi_description - kpi_sample_type = request.kpi_sample_type - kpi_device_id = request.device_id.device_uuid.uuid - kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = request.service_id.service_uuid.uuid - kpi_slice_id = request.slice_id.slice_uuid.uuid - kpi_connection_id = request.connection_id.connection_uuid.uuid - - - if request.kpi_id.kpi_id.uuid != "": - response.kpi_id.uuid = request.kpi_id.kpi_id.uuid - # Here the code to modify an existing kpi - else: - data = self.management_db.insert_KPI( - kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) - response.kpi_id.uuid = str(data) - - return response - except ServiceException as e: - LOGGER.exception('SetKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('SetKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + response = KpiId() + kpi_description = request.kpi_description + kpi_sample_type = request.kpi_sample_type + kpi_device_id = request.device_id.device_uuid.uuid + kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = request.service_id.service_uuid.uuid + kpi_slice_id = request.slice_id.slice_uuid.uuid + kpi_connection_id = request.connection_id.connection_uuid.uuid + if request.kpi_id.kpi_id.uuid != "": + response.kpi_id.uuid = request.kpi_id.kpi_id.uuid + # Here the code to modify an existing kpi + else: + data = self.management_db.insert_KPI( + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, + kpi_connection_id) + response.kpi_id.uuid = str(data) + return response + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty: + kpi_id = int(request.kpi_id.uuid) + kpi = self.management_db.get_KPI(kpi_id) + if kpi: + self.management_db.delete_KPI(kpi_id) + else: + LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + return Empty() - LOGGER.info('DeleteKpi') - try: - LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}') - kpi_id = int(request.kpi_id.uuid) - kpi = self.management_db.get_KPI(kpi_id) - if kpi: - self.management_db.delete_KPI(kpi_id) - else: - LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) - return Empty() - except ServiceException as e: - LOGGER.exception('DeleteKpi exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('DeleteKpi exception') - + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor: - LOGGER.info('getting Kpi by KpiID') - try: - kpi_id = request.kpi_id.uuid - kpi_db = self.management_db.get_KPI(int(kpi_id)) - kpiDescriptor = KpiDescriptor() - if kpi_db is None: - LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) - else: - kpiDescriptor.kpi_description = kpi_db[1] - kpiDescriptor.kpi_sample_type = kpi_db[2] - kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) - kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) - kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) - kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) - kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) - return kpiDescriptor - except ServiceException as e: - LOGGER.exception('GetKpiDescriptor exception') - grpc_context.abort(e.code, e.details) - except Exception: # pragma: no cover - LOGGER.exception('GetKpiDescriptor exception') - + kpi_id = request.kpi_id.uuid + kpi_db = self.management_db.get_KPI(int(kpi_id)) + kpiDescriptor = KpiDescriptor() + if kpi_db is None: + LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) + else: + kpiDescriptor.kpi_description = kpi_db[1] + kpiDescriptor.kpi_sample_type = kpi_db[2] + kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) + kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) + kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) + kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) + kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) + return kpiDescriptor + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList: - - LOGGER.info('GetKpiDescriptorList') - try: - kpi_descriptor_list = KpiDescriptorList() - - data = self.management_db.get_KPIS() - LOGGER.debug(f"data: {data}") - - for item in data: - kpi_descriptor = KpiDescriptor() - kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) - kpi_descriptor.kpi_description = item[1] - kpi_descriptor.kpi_sample_type = item[2] - kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) - kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) - kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) - kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) - kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) - - kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) - - return kpi_descriptor_list - except ServiceException as e: - LOGGER.exception('GetKpiDescriptorList exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetKpiDescriptorList exception') - + kpi_descriptor_list = KpiDescriptorList() + data = self.management_db.get_KPIS() + LOGGER.debug(f"data: {data}") + for item in data: + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) + kpi_descriptor.kpi_description = item[1] + kpi_descriptor.kpi_sample_type = item[2] + kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) + kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) + kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) + kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) + kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) + kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) + return kpi_descriptor_list + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty: + kpi_id = request.kpi_id.kpi_id.uuid + kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) - LOGGER.info('IncludeKpi') - - try: - kpi_id = request.kpi_id.kpi_id.uuid + if kpiDescriptor is None: + LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + else: + kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') + kpiId = kpi_id + deviceId = kpiDescriptor.device_id.device_uuid.uuid + endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid + serviceId = kpiDescriptor.service_id.service_uuid.uuid + sliceId = kpiDescriptor.slice_id.slice_uuid.uuid + connectionId = kpiDescriptor.connection_id.connection_uuid.uuid + time_stamp = request.timestamp.timestamp + kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) + + # Build the structure to be included as point in the MetricsDB + self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) + return Empty() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty: + kpi_id = int(request.kpi_id.kpi_id.uuid) + kpi = self.management_db.get_KPI(kpi_id) + response = Empty() + if kpi: + # Sets the request to send to the device service + monitor_device_request = MonitoringSettings() kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) + monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid + monitor_device_request.sampling_duration_s = request.monitoring_window_s + monitor_device_request.sampling_interval_s = request.sampling_rate_s + if not self.management_db.check_monitoring_flag(kpi_id): + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) + self.management_db.set_monitoring_flag(kpi_id,True) + self.management_db.check_monitoring_flag(kpi_id) + else: + LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) + else: + LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + return response + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: + raw_kpi_table = RawKpiTable() + kpi_id_list = request.kpi_ids + monitoring_window_s = request.monitoring_window_s + last_n_samples = request.last_n_samples + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + # Check if all the Kpi_ids exist + for item in kpi_id_list: + kpi_id = item.kpi_id.uuid + kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) if kpiDescriptor is None: - LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) + break else: - kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') - kpiId = kpi_id - deviceId = kpiDescriptor.device_id.device_uuid.uuid - endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - serviceId = kpiDescriptor.service_id.service_uuid.uuid - sliceId = kpiDescriptor.slice_id.slice_uuid.uuid - connectionId = kpiDescriptor.connection_id.connection_uuid.uuid - time_stamp = request.timestamp.timestamp - kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - - # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) - - return Empty() - except ServiceException as e: - LOGGER.exception('IncludeKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception: # pragma: no cover - LOGGER.exception('IncludeKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - - def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty: - - LOGGER.info('MonitorKpi') - try: - kpi_id = int(request.kpi_id.kpi_id.uuid) - kpi = self.management_db.get_KPI(kpi_id) - response = Empty() - - if kpi: - # Sets the request to send to the device service - monitor_device_request = MonitoringSettings() - - kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + # Execute query per Kpi_id and introduce their kpi_list in the table + kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) + raw_kpi_list = RawKpiList() + raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id - monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) - monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.sampling_duration_s = request.monitoring_window_s - monitor_device_request.sampling_interval_s = request.sampling_rate_s + LOGGER.debug(str(kpi_list)) - if not self.management_db.check_monitoring_flag(kpi_id): - device_client = DeviceClient() - device_client.MonitorDeviceKpi(monitor_device_request) - self.management_db.set_monitoring_flag(kpi_id,True) - self.management_db.check_monitoring_flag(kpi_id) + if kpi_list is None: + LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) else: - LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) - else: - LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) - return response - except ServiceException as e: - LOGGER.exception('MonitorKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('MonitorKpi exception') - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - # CREATEKPI_COUNTER_FAILED.inc() + for item in kpi_list: + raw_kpi = RawKpi() + raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) + raw_kpi.kpi_value.floatVal = item[1] + raw_kpi_list.raw_kpis.append(raw_kpi) - def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: + raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) + return raw_kpi_table - LOGGER.info('QueryKpiData') - try: - raw_kpi_table = RawKpiTable() + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse: + subs_queue = Queue() + + kpi_id = request.kpi_id.kpi_id.uuid + sampling_duration_s = request.sampling_duration_s + sampling_interval_s = request.sampling_interval_s + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + subscriber = "localhost" # Investigate how to get info from the requester + + subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s, + sampling_interval_s, start_timestamp, end_timestamp) + self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s, + start_timestamp, end_timestamp) + + # parse queue to append kpis into the list + while True: + while not subs_queue.empty(): + subs_response = SubsResponse() + list = subs_queue.get_nowait() + for item in list: + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = str(item[0]) + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + subs_response.subs_id.subs_id.uuid = str(subs_id) + yield subs_response + if timestamp_utcnow_to_float() > end_timestamp: + break + # yield subs_response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor: + subs_id = request.subs_id.uuid + subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) + response = SubsDescriptor() + if subs_db is None: + LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id))) + else: + LOGGER.debug(subs_db) + response.subs_id.subs_id.uuid = str(subs_db[0]) + response.kpi_id.kpi_id.uuid = str(subs_db[1]) + response.sampling_duration_s = subs_db[3] + response.sampling_interval_s = subs_db[4] + response.start_timestamp.timestamp = subs_db[5] + response.end_timestamp.timestamp = subs_db[6] + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList: + response = SubsList() + data = self.management_db.get_subscriptions() + for subs_db in data: + subs_descriptor = SubsDescriptor() + subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0]) + subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1]) + subs_descriptor.sampling_duration_s = subs_db[3] + subs_descriptor.sampling_interval_s = subs_db[4] + subs_descriptor.start_timestamp.timestamp = subs_db[5] + subs_descriptor.end_timestamp.timestamp = subs_db[6] + response.subs_descriptor.append(subs_descriptor) + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty: + subs_id = int(request.subs_id.uuid) + subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) + if subs_db: + self.management_db.delete_subscription(subs_id) + else: + LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id))) + return Empty() - LOGGER.debug(str(request)) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse: + response = AlarmID() + alarm_description = request.alarm_description + alarm_name = request.name + kpi_id = request.kpi_id.kpi_id.uuid + kpi_min_value = float(request.kpi_value_range.kpiMinValue.floatVal) + kpi_max_value = float(request.kpi_value_range.kpiMaxValue.floatVal) + in_range = request.kpi_value_range.inRange + include_min_value = request.kpi_value_range.includeMinValue + include_max_value = request.kpi_value_range.includeMaxValue + timestamp = request.timestamp.timestamp + LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") + if request.alarm_id.alarm_id.uuid != "": + alarm_id = request.alarm_id.alarm_id.uuid + # Here the code to modify an existing alarm + else: + alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value, + kpi_max_value, + in_range, include_min_value, include_max_value) + LOGGER.debug(f"AlarmID: {alarm_id}") + response.alarm_id.uuid = str(alarm_id) + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList: + response = AlarmList() + data = self.management_db.get_alarms() - kpi_id_list = request.kpi_ids - monitoring_window_s = request.monitoring_window_s - last_n_samples = request.last_n_samples - start_timestamp = request.start_timestamp.timestamp - end_timestamp = request.end_timestamp.timestamp + for alarm in data: + alarm_descriptor = AlarmDescriptor() - # Check if all the Kpi_ids exist - for item in kpi_id_list: - kpi_id = item.kpi_id.uuid + alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0]) + alarm_descriptor.alarm_description = alarm[1] + alarm_descriptor.name = alarm[2] + alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3]) + alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4] + alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5] + alarm_descriptor.kpi_value_range.inRange = bool(alarm[6]) + alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7]) + alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8]) - kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) - if kpiDescriptor is None: - LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) - break - else: - # Execute query per Kpi_id and introduce their kpi_list in the table - kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) - raw_kpi_list = RawKpiList() - raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id - - LOGGER.debug(str(kpi_list)) - - if kpi_list is None: - LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) - else: - for item in kpi_list: - raw_kpi = RawKpi() - raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) - raw_kpi.kpi_value.floatVal = item[1] - raw_kpi_list.raw_kpis.append(raw_kpi) - - raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) - - return raw_kpi_table - except ServiceException as e: - LOGGER.exception('QueryKpiData exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('QueryKpiData exception') + response.alarm_descriptor.append(alarm_descriptor) - def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse: + return response - LOGGER.info('SubscribeKpi') - try: - subs_queue = Queue() + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor: + alarm_id = request.alarm_id.uuid + LOGGER.debug(alarm_id) + alarm = self.management_db.get_alarm(alarm_id) + response = AlarmDescriptor() + + if alarm: + LOGGER.debug(f"{alarm}") + response.alarm_id.alarm_id.uuid = str(alarm_id) + response.alarm_description = alarm[1] + response.name = alarm[2] + response.kpi_id.kpi_id.uuid = str(alarm[3]) + response.kpi_value_range.kpiMinValue.floatVal = alarm[4] + response.kpi_value_range.kpiMaxValue.floatVal = alarm[5] + response.kpi_value_range.inRange = bool(alarm[6]) + response.kpi_value_range.includeMinValue = bool(alarm[7]) + response.kpi_value_range.includeMaxValue = bool(alarm[8]) + else: + LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + response.alarm_id.alarm_id.uuid = "NoID" + return response + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetAlarmResponseStream( + self, request: AlarmSubscription, grpc_context: grpc.ServicerContext + ) -> Iterator[AlarmResponse]: + alarm_id = request.alarm_id.alarm_id.uuid + alarm_data = self.management_db.get_alarm(alarm_id) + real_start_time = timestamp_utcnow_to_float() + + if alarm_data: + LOGGER.debug(f"{alarm_data}") + alarm_queue = Queue() - kpi_id = request.kpi_id.kpi_id.uuid - sampling_duration_s = request.sampling_duration_s - sampling_interval_s = request.sampling_interval_s - start_timestamp = request.start_timestamp.timestamp - end_timestamp = request.end_timestamp.timestamp + alarm_id = request.alarm_id.alarm_id.uuid + kpi_id = alarm_data[3] + kpiMinValue = alarm_data[4] + kpiMaxValue = alarm_data[5] + inRange = alarm_data[6] + includeMinValue = alarm_data[7] + includeMaxValue = alarm_data[8] + subscription_frequency_ms = request.subscription_frequency_ms + subscription_timeout_s = request.subscription_timeout_s - subscriber = "localhost" # Investigate how to get info from the requester + end_timestamp = real_start_time + subscription_timeout_s - subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s, - sampling_interval_s, start_timestamp, end_timestamp) - self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s, - start_timestamp, end_timestamp) + self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, + includeMinValue, includeMaxValue, subscription_frequency_ms, + subscription_timeout_s) - # parse queue to append kpis into the list while True: - while not subs_queue.empty(): - subs_response = SubsResponse() - list = subs_queue.get_nowait() + while not alarm_queue.empty(): + alarm_response = AlarmResponse() + list = alarm_queue.get_nowait() + size = len(list) for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = str(item[0]) kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved - subs_response.kpi_list.kpi.append(kpi) - subs_response.subs_id.subs_id.uuid = str(subs_id) - yield subs_response + alarm_response.kpi_list.kpi.append(kpi) + alarm_response.alarm_id.alarm_id.uuid = alarm_id + yield alarm_response if timestamp_utcnow_to_float() > end_timestamp: break - # yield subs_response - except ServiceException as e: - LOGGER.exception('SubscribeKpi exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('SubscribeKpi exception') - - def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor: - - LOGGER.info('GetSubsDescriptor') - try: - subs_id = request.subs_id.uuid - subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) - response = SubsDescriptor() - if subs_db is None: - LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id))) - else: - LOGGER.debug(subs_db) - response.subs_id.subs_id.uuid = str(subs_db[0]) - response.kpi_id.kpi_id.uuid = str(subs_db[1]) - response.sampling_duration_s = subs_db[3] - response.sampling_interval_s = subs_db[4] - response.start_timestamp.timestamp = subs_db[5] - response.end_timestamp.timestamp = subs_db[6] - - return response - except ServiceException as e: - LOGGER.exception('GetSubsDescriptor exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetSubsDescriptor exception') - - def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList: - - LOGGER.info('GetSubscriptions') - try: - response = SubsList() - data = self.management_db.get_subscriptions() - - for subs_db in data: - subs_descriptor = SubsDescriptor() - - subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0]) - subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1]) - subs_descriptor.sampling_duration_s = subs_db[3] - subs_descriptor.sampling_interval_s = subs_db[4] - subs_descriptor.start_timestamp.timestamp = subs_db[5] - subs_descriptor.end_timestamp.timestamp = subs_db[6] - - response.subs_descriptor.append(subs_descriptor) - - return response - except ServiceException as e: - LOGGER.exception('GetSubscriptions exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetSubscriptions exception') - - def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty: - - LOGGER.info('DeleteSubscription') - try: - LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}') - subs_id = int(request.subs_id.uuid) - subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) - if subs_db: - self.management_db.delete_subscription(subs_id) - else: - LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id))) - return Empty() - except ServiceException as e: - LOGGER.exception('DeleteSubscription exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('DeleteSubscription exception') - - def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse: - - LOGGER.info('SetKpiAlarm') - try: - response = AlarmID() - - alarm_description = request.alarm_description - alarm_name = request.name - kpi_id = request.kpi_id.kpi_id.uuid - kpi_min_value = float(request.kpi_value_range.kpiMinValue.floatVal) - kpi_max_value = float(request.kpi_value_range.kpiMaxValue.floatVal) - in_range = request.kpi_value_range.inRange - include_min_value = request.kpi_value_range.includeMinValue - include_max_value = request.kpi_value_range.includeMaxValue - timestamp = request.timestamp.timestamp - - LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") - - if request.alarm_id.alarm_id.uuid != "": - alarm_id = request.alarm_id.alarm_id.uuid - # Here the code to modify an existing alarm - else: - alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value, - kpi_max_value, - in_range, include_min_value, include_max_value) - LOGGER.debug(f"AlarmID: {alarm_id}") - response.alarm_id.uuid = str(alarm_id) - - return response - except ServiceException as e: - LOGGER.exception('SetKpiAlarm exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('SetKpiAlarm exception') - - def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList: - - LOGGER.info('GetAlarms') - try: - response = AlarmList() - data = self.management_db.get_alarms() - - for alarm in data: - alarm_descriptor = AlarmDescriptor() - - alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0]) - alarm_descriptor.alarm_description = alarm[1] - alarm_descriptor.name = alarm[2] - alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3]) - alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4] - alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5] - alarm_descriptor.kpi_value_range.inRange = bool(alarm[6]) - alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7]) - alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8]) - - response.alarm_descriptor.append(alarm_descriptor) - - return response - except ServiceException as e: - LOGGER.exception('GetAlarms exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetAlarms exception') - - def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor: - - LOGGER.info('GetAlarmDescriptor') - try: - alarm_id = request.alarm_id.uuid - LOGGER.debug(alarm_id) - alarm = self.management_db.get_alarm(alarm_id) - response = AlarmDescriptor() - - if alarm: - LOGGER.debug(f"{alarm}") - response.alarm_id.alarm_id.uuid = str(alarm_id) - response.alarm_description = alarm[1] - response.name = alarm[2] - response.kpi_id.kpi_id.uuid = str(alarm[3]) - response.kpi_value_range.kpiMinValue.floatVal = alarm[4] - response.kpi_value_range.kpiMaxValue.floatVal = alarm[5] - response.kpi_value_range.inRange = bool(alarm[6]) - response.kpi_value_range.includeMinValue = bool(alarm[7]) - response.kpi_value_range.includeMaxValue = bool(alarm[8]) - else: - LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id))) - response.alarm_id.alarm_id.uuid = "NoID" - return response - except ServiceException as e: - LOGGER.exception('GetAlarmDescriptor exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetAlarmDescriptor exception') - - def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[ - AlarmResponse]: - - LOGGER.info('GetAlarmResponseStream') - try: - alarm_id = request.alarm_id.alarm_id.uuid - alarm_data = self.management_db.get_alarm(alarm_id) - real_start_time = timestamp_utcnow_to_float() - - if alarm_data: - LOGGER.debug(f"{alarm_data}") - alarm_queue = Queue() - - alarm_id = request.alarm_id.alarm_id.uuid - kpi_id = alarm_data[3] - kpiMinValue = alarm_data[4] - kpiMaxValue = alarm_data[5] - inRange = alarm_data[6] - includeMinValue = alarm_data[7] - includeMaxValue = alarm_data[8] - subscription_frequency_ms = request.subscription_frequency_ms - subscription_timeout_s = request.subscription_timeout_s - - end_timestamp = real_start_time + subscription_timeout_s - - self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, - includeMinValue, includeMaxValue, subscription_frequency_ms, - subscription_timeout_s) - - while True: - while not alarm_queue.empty(): - alarm_response = AlarmResponse() - list = alarm_queue.get_nowait() - size = len(list) - for item in list: - kpi = Kpi() - kpi.kpi_id.kpi_id.uuid = str(item[0]) - kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) - kpi.kpi_value.floatVal = item[2] # This must be improved - alarm_response.kpi_list.kpi.append(kpi) - alarm_response.alarm_id.alarm_id.uuid = alarm_id - yield alarm_response - if timestamp_utcnow_to_float() > end_timestamp: - break - else: - LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) - alarm_response = AlarmResponse() - alarm_response.alarm_id.alarm_id.uuid = "NoID" - return alarm_response - except ServiceException as e: - LOGGER.exception('GetAlarmResponseStream exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetAlarmResponseStream exception') + else: + LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + alarm_response = AlarmResponse() + alarm_response.alarm_id.alarm_id.uuid = "NoID" + return alarm_response + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty: + alarm_id = int(request.alarm_id.uuid) + alarm = self.management_db.get_alarm(alarm_id) + response = Empty() + if alarm: + self.alarm_manager.delete_alarm(alarm_id) + self.management_db.delete_alarm(alarm_id) + else: + LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id))) + return response - LOGGER.info('DeleteAlarm') - try: - LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}') - alarm_id = int(request.alarm_id.uuid) - alarm = self.management_db.get_alarm(alarm_id) - response = Empty() - if alarm: - self.alarm_manager.delete_alarm(alarm_id) - self.management_db.delete_alarm(alarm_id) - else: - LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id))) - return response - except ServiceException as e: - LOGGER.exception('DeleteAlarm exception') - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('DeleteAlarm exception') - + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]: - - LOGGER.info('GetStreamKpi') - kpi_id = request.kpi_id.uuid kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() @@ -581,36 +420,23 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): else: yield response - @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi: - - LOGGER.info('GetInstantKpi') - try: - kpi_id = request.kpi_id.uuid - response = Kpi() - if kpi_id == "": - LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) - response.kpi_id.kpi_id.uuid = "NoID" + kpi_id = request.kpi_id.uuid + response = Kpi() + if kpi_id == "": + LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) + response.kpi_id.kpi_id.uuid = "NoID" + else: + query = f"SELECT kpi_id, timestamp, kpi_value FROM {METRICSDB_TABLE_MONITORING_KPIS} " \ + f"WHERE kpi_id = '{kpi_id}' LATEST ON timestamp PARTITION BY kpi_id" + data = self.metrics_db.run_query(query) + LOGGER.debug(data) + if len(data) == 0: + response.kpi_id.kpi_id.uuid = request.kpi_id.uuid else: - query = f"SELECT kpi_id, timestamp, kpi_value FROM {METRICSDB_TABLE_MONITORING_KPIS} " \ - f"WHERE kpi_id = '{kpi_id}' LATEST ON timestamp PARTITION BY kpi_id" - data = self.metrics_db.run_query(query) - LOGGER.debug(data) - if len(data) == 0: - response.kpi_id.kpi_id.uuid = request.kpi_id.uuid - else: - _data = data[0] - response.kpi_id.kpi_id.uuid = str(_data[0]) - response.timestamp.timestamp = timestamp_string_to_float(_data[1]) - response.kpi_value.floatVal = _data[2] - - return response - except ServiceException as e: - LOGGER.exception('GetInstantKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('GetInstantKpi exception') - # CREATEKPI_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - + _data = data[0] + response.kpi_id.kpi_id.uuid = str(_data[0]) + response.timestamp.timestamp = timestamp_string_to_float(_data[1]) + response.kpi_value.floatVal = _data[2] + return response diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index c883f9d14..4e84431a5 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -25,7 +25,6 @@ from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) -#from common.logger import getJSONLogger from common.proto.context_pb2 import DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from common.proto.kpi_sample_types_pb2 import KpiSampleType -- GitLab