# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os, grpc from queue import Queue from typing import Iterator from common.logger import getJSONLogger from common.proto.context_pb2 import Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList from common.method_wrappers.ServiceExceptions import ServiceException from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary from monitoring.service.AlarmManager import AlarmManager from monitoring.service.NameMapping import NameMapping from monitoring.service.SubscriptionManager import SubscriptionManager LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") class MonitoringServiceServicerImpl(MonitoringServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.management_db = ManagementDBTools.ManagementDB('monitoring.db') self.deviceClient = DeviceClient() self.metrics_db = MetricsDBTools.MetricsDB( METRICSDB_HOSTNAME, name_mapping, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) self.subs_manager = SubscriptionManager(self.metrics_db) self.alarm_manager = AlarmManager(self.metrics_db) LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} def SetKpi( self, request: KpiDescriptor, grpc_context: grpc.ServicerContext ) -> KpiId: # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('SetKpi') try: # Here the code to create a sqlite query to crete a KPI and return a KpiID response = KpiId() kpi_description = request.kpi_description kpi_sample_type = request.kpi_sample_type kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid kpi_slice_id = request.slice_id.slice_uuid.uuid kpi_connection_id = request.connection_id.connection_uuid.uuid if request.kpi_id.kpi_id.uuid != "": response.kpi_id.uuid = request.kpi_id.kpi_id.uuid # Here the code to modify an existing kpi else: data = self.management_db.insert_KPI( kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response.kpi_id.uuid = str(data) return response except ServiceException as e: LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteKpi') try: LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}') kpi_id = int(request.kpi_id.uuid) kpi = self.management_db.get_KPI(kpi_id) if kpi: self.management_db.delete_KPI(kpi_id) else: LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return Empty() except ServiceException as e: LOGGER.exception('DeleteKpi exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteKpi exception') def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor: LOGGER.info('getting Kpi by KpiID') try: kpi_id = request.kpi_id.uuid kpi_db = self.management_db.get_KPI(int(kpi_id)) kpiDescriptor = KpiDescriptor() if kpi_db is None: LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: kpiDescriptor.kpi_description = kpi_db[1] kpiDescriptor.kpi_sample_type = kpi_db[2] kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6]) kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') grpc_context.abort(e.code, e.details) except Exception: # pragma: no cover LOGGER.exception('GetKpiDescriptor exception') def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList: LOGGER.info('GetKpiDescriptorList') try: kpi_descriptor_list = KpiDescriptorList() data = self.management_db.get_KPIS() LOGGER.debug(f"data: {data}") for item in data: kpi_descriptor = KpiDescriptor() kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0]) kpi_descriptor.kpi_description = item[1] kpi_descriptor.kpi_sample_type = item[2] kpi_descriptor.device_id.device_uuid.uuid = str(item[3]) kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4]) kpi_descriptor.service_id.service_uuid.uuid = str(item[5]) kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6]) kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7]) kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor) return kpi_descriptor_list except ServiceException as e: LOGGER.exception('GetKpiDescriptorList exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetKpiDescriptorList exception') def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('IncludeKpi') try: kpi_id = request.kpi_id.kpi_id.uuid kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) if kpiDescriptor is None: LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) else: kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') kpiId = kpi_id deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid sliceId = kpiDescriptor.slice_id.slice_uuid.uuid connectionId = kpiDescriptor.connection_id.connection_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) return Empty() except ServiceException as e: LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('MonitorKpi') try: kpi_id = int(request.kpi_id.kpi_id.uuid) kpi = self.management_db.get_KPI(kpi_id) response = Empty() if kpi: # Sets the request to send to the device service monitor_device_request = MonitoringSettings() kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid monitor_device_request.sampling_duration_s = request.monitoring_window_s monitor_device_request.sampling_interval_s = request.sampling_rate_s if not self.management_db.check_monitoring_flag(kpi_id): device_client = DeviceClient() device_client.MonitorDeviceKpi(monitor_device_request) self.management_db.set_monitoring_flag(kpi_id,True) self.management_db.check_monitoring_flag(kpi_id) else: LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id))) else: LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) return response except ServiceException as e: LOGGER.exception('MonitorKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('MonitorKpi exception') grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable: LOGGER.info('QueryKpiData') try: raw_kpi_table = RawKpiTable() LOGGER.debug(str(request)) kpi_id_list = request.kpi_ids monitoring_window_s = request.monitoring_window_s last_n_samples = request.last_n_samples start_timestamp = request.start_timestamp.timestamp end_timestamp = request.end_timestamp.timestamp # Check if all the Kpi_ids exist for item in kpi_id_list: kpi_id = item.kpi_id.uuid kpiDescriptor = self.GetKpiDescriptor(item, grpc_context) if kpiDescriptor is None: LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id))) break else: # Execute query per Kpi_id and introduce their kpi_list in the table kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp) raw_kpi_list = RawKpiList() raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id LOGGER.debug(str(kpi_list)) if kpi_list is None: LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id))) else: for item in kpi_list: raw_kpi = RawKpi() raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0]) raw_kpi.kpi_value.floatVal = item[1] raw_kpi_list.raw_kpis.append(raw_kpi) raw_kpi_table.raw_kpi_lists.append(raw_kpi_list) return raw_kpi_table except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('QueryKpiData exception') def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse: LOGGER.info('SubscribeKpi') try: subs_queue = Queue() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s sampling_interval_s = request.sampling_interval_s start_timestamp = request.start_timestamp.timestamp end_timestamp = request.end_timestamp.timestamp subscriber = "localhost" # Investigate how to get info from the requester subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s, sampling_interval_s, start_timestamp, end_timestamp) self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s, start_timestamp, end_timestamp) # parse queue to append kpis into the list while True: while not subs_queue.empty(): subs_response = SubsResponse() list = subs_queue.get_nowait() for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = str(item[0]) kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved subs_response.kpi_list.kpi.append(kpi) subs_response.subs_id.subs_id.uuid = str(subs_id) yield subs_response if timestamp_utcnow_to_float() > end_timestamp: break # yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SubscribeKpi exception') def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor: LOGGER.info('GetSubsDescriptor') try: subs_id = request.subs_id.uuid subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) response = SubsDescriptor() if subs_db is None: LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id))) else: LOGGER.debug(subs_db) response.subs_id.subs_id.uuid = str(subs_db[0]) response.kpi_id.kpi_id.uuid = str(subs_db[1]) response.sampling_duration_s = subs_db[3] response.sampling_interval_s = subs_db[4] response.start_timestamp.timestamp = subs_db[5] response.end_timestamp.timestamp = subs_db[6] return response except ServiceException as e: LOGGER.exception('GetSubsDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubsDescriptor exception') def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList: LOGGER.info('GetSubscriptions') try: response = SubsList() data = self.management_db.get_subscriptions() for subs_db in data: subs_descriptor = SubsDescriptor() subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0]) subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1]) subs_descriptor.sampling_duration_s = subs_db[3] subs_descriptor.sampling_interval_s = subs_db[4] subs_descriptor.start_timestamp.timestamp = subs_db[5] subs_descriptor.end_timestamp.timestamp = subs_db[6] response.subs_descriptor.append(subs_descriptor) return response except ServiceException as e: LOGGER.exception('GetSubscriptions exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubscriptions exception') def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteSubscription') try: LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}') subs_id = int(request.subs_id.uuid) subs_db = self.management_db.get_subscription(int(request.subs_id.uuid)) if subs_db: self.management_db.delete_subscription(subs_id) else: LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id))) return Empty() except ServiceException as e: LOGGER.exception('DeleteSubscription exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteSubscription exception') def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse: LOGGER.info('SetKpiAlarm') try: response = AlarmID() alarm_description = request.alarm_description alarm_name = request.name kpi_id = request.kpi_id.kpi_id.uuid kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal in_range = request.kpi_value_range.inRange include_min_value = request.kpi_value_range.includeMinValue include_max_value = request.kpi_value_range.includeMaxValue timestamp = request.timestamp.timestamp LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}") if request.alarm_id.alarm_id.uuid != "": alarm_id = request.alarm_id.alarm_id.uuid # Here the code to modify an existing alarm else: alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) LOGGER.debug(f"AlarmID: {alarm_id}") response.alarm_id.uuid = str(alarm_id) return response except ServiceException as e: LOGGER.exception('SetKpiAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SetKpiAlarm exception') def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList: LOGGER.info('GetAlarms') try: response = AlarmList() data = self.management_db.get_alarms() for alarm in data: alarm_descriptor = AlarmDescriptor() alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0]) alarm_descriptor.alarm_description = alarm[1] alarm_descriptor.name = alarm[2] alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3]) alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4] alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5] alarm_descriptor.kpi_value_range.inRange = bool(alarm[6]) alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7]) alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8]) response.alarm_descriptor.append(alarm_descriptor) return response except ServiceException as e: LOGGER.exception('GetAlarms exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarms exception') def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor: LOGGER.info('GetAlarmDescriptor') try: alarm_id = request.alarm_id.uuid LOGGER.debug(alarm_id) alarm = self.management_db.get_alarm(alarm_id) response = AlarmDescriptor() if alarm: LOGGER.debug(f"{alarm}") response.alarm_id.alarm_id.uuid = str(alarm_id) response.alarm_description = alarm[1] response.name = alarm[2] response.kpi_id.kpi_id.uuid = str(alarm[3]) response.kpi_value_range.kpiMinValue.floatVal = alarm[4] response.kpi_value_range.kpiMaxValue.floatVal = alarm[5] response.kpi_value_range.inRange = bool(alarm[6]) response.kpi_value_range.includeMinValue = bool(alarm[7]) response.kpi_value_range.includeMaxValue = bool(alarm[8]) else: LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id))) response.alarm_id.alarm_id.uuid = "NoID" return response except ServiceException as e: LOGGER.exception('GetAlarmDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmDescriptor exception') def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[ AlarmResponse]: LOGGER.info('GetAlarmResponseStream') try: alarm_id = request.alarm_id.alarm_id.uuid alarm_data = self.management_db.get_alarm(alarm_id) real_start_time = timestamp_utcnow_to_float() if alarm_data: LOGGER.debug(f"{alarm_data}") alarm_queue = Queue() alarm_id = request.alarm_id.alarm_id.uuid kpi_id = alarm_data[3] kpiMinValue = alarm_data[4] kpiMaxValue = alarm_data[5] inRange = alarm_data[6] includeMinValue = alarm_data[7] includeMaxValue = alarm_data[8] subscription_frequency_ms = request.subscription_frequency_ms subscription_timeout_s = request.subscription_timeout_s end_timestamp = real_start_time + subscription_timeout_s self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange, includeMinValue, includeMaxValue, subscription_frequency_ms, subscription_timeout_s) while True: while not alarm_queue.empty(): alarm_response = AlarmResponse() list = alarm_queue.get_nowait() size = len(list) for item in list: kpi = Kpi() kpi.kpi_id.kpi_id.uuid = str(item[0]) kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved alarm_response.kpi_list.kpi.append(kpi) alarm_response.alarm_id.alarm_id.uuid = alarm_id yield alarm_response if timestamp_utcnow_to_float() > end_timestamp: break else: LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id))) alarm_response = AlarmResponse() alarm_response.alarm_id.alarm_id.uuid = "NoID" return alarm_response except ServiceException as e: LOGGER.exception('GetAlarmResponseStream exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmResponseStream exception') def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty: LOGGER.info('DeleteAlarm') try: LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}') alarm_id = int(request.alarm_id.uuid) alarm = self.management_db.get_alarm(alarm_id) response = Empty() if alarm: self.alarm_manager.delete_alarm(alarm_id) self.management_db.delete_alarm(alarm_id) else: LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id))) return response except ServiceException as e: LOGGER.exception('DeleteAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteAlarm exception') def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]: LOGGER.info('GetStreamKpi') kpi_id = request.kpi_id.uuid kpi_db = self.management_db.get_KPI(int(kpi_id)) response = Kpi() if kpi_db is None: LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" return response else: yield response @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi: LOGGER.info('GetInstantKpi') try: kpi_id = request.kpi_id.uuid response = Kpi() if kpi_id == "": LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id))) response.kpi_id.kpi_id.uuid = "NoID" else: query = f"SELECT kpi_id, timestamp, kpi_value FROM {METRICSDB_TABLE} WHERE kpi_id = '{kpi_id}' " \ f"LATEST ON timestamp PARTITION BY kpi_id" data = self.metrics_db.run_query(query) LOGGER.debug(data) if len(data) == 0: response.kpi_id.kpi_id.uuid = request.kpi_id.uuid else: _data = data[0] response.kpi_id.kpi_id.uuid = str(_data[0]) response.timestamp.timestamp = timestamp_string_to_float(_data[1]) response.kpi_value.floatVal = _data[2] return response except ServiceException as e: LOGGER.exception('GetInstantKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetInstantKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))