Skip to content
Snippets Groups Projects
MonitoringServiceServicerImpl.py 28.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import os, grpc
    from queue import Queue
    
    from typing import Iterator
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    from common.logger import getJSONLogger
    
    from common.proto.context_pb2 import Empty
    from common.proto.device_pb2 import MonitoringSettings
    
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    
    from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
    
    from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
    
        KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    
        MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.method_wrappers.ServiceExceptions import ServiceException
    
    from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    from monitoring.service import ManagementDBTools, MetricsDBTools
    
    from device.client.DeviceClient import DeviceClient
    
    
    from prometheus_client import Counter, Summary
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    from monitoring.service.AlarmManager import AlarmManager
    from monitoring.service.SubscriptionManager import SubscriptionManager
    
    
    LOGGER = getJSONLogger('monitoringservice-server')
    LOGGER.setLevel('DEBUG')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
        'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
    
    MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
    
    
    METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
    METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
    
    METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
    
    METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
    
    Javi Moreno's avatar
    Javi Moreno committed
    
    
    class MonitoringServiceServicerImpl(MonitoringServiceServicer):
    
        def __init__(self):
            LOGGER.info('Init monitoringService')
    
            # Init sqlite monitoring db
    
            self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self.deviceClient = DeviceClient()
    
            self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT,
                                                       METRICSDB_TABLE)
            self.subs_manager = SubscriptionManager(self.metrics_db)
            self.alarm_manager = AlarmManager(self.metrics_db)
    
            LOGGER.info('MetricsDB initialized')
    
        # SetKpi (SetKpiRequest) returns (KpiId) {}
        def SetKpi(
    
                self, request: KpiDescriptor, grpc_context: grpc.ServicerContext
    
            # CREATEKPI_COUNTER_STARTED.inc()
    
            try:
                # Here the code to create a sqlite query to crete a KPI and return a KpiID
    
    
                kpi_description = request.kpi_description
                kpi_sample_type = request.kpi_sample_type
    
                kpi_device_id = request.device_id.device_uuid.uuid
    
                kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
    
                kpi_service_id = request.service_id.service_uuid.uuid
    
                kpi_slice_id = request.slice_id.slice_uuid.uuid
    
                kpi_connection_id = request.connection_id.connection_uuid.uuid
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if request.kpi_id.kpi_id.uuid != "":
    
                    response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
                #     Here the code to modify an existing kpi
                else:
                    data = self.management_db.insert_KPI(
    
                        kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id)
    
                    response.kpi_id.uuid = str(data)
    
    Javi Moreno's avatar
    Javi Moreno committed
    
    
            except ServiceException as e:
    
                LOGGER.exception('SetKpi exception')
    
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
    
                LOGGER.exception('SetKpi exception')
    
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
    
        def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty:
    
                LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}')
                kpi_id = int(request.kpi_id.uuid)
                kpi = self.management_db.get_KPI(kpi_id)
                if kpi:
                    self.management_db.delete_KPI(kpi_id)
                else:
                    LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
    
            except ServiceException as e:
                LOGGER.exception('DeleteKpi exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteKpi exception')
    
    
        def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
            LOGGER.info('getting Kpi by KpiID')
    
                kpi_id = request.kpi_id.uuid
                kpi_db = self.management_db.get_KPI(int(kpi_id))
                kpiDescriptor = KpiDescriptor()
                if kpi_db is None:
                    LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                else:
    
                    kpiDescriptor.kpi_description                       = kpi_db[1]
                    kpiDescriptor.kpi_sample_type                       = kpi_db[2]
                    kpiDescriptor.device_id.device_uuid.uuid            = str(kpi_db[3])
                    kpiDescriptor.endpoint_id.endpoint_uuid.uuid        = str(kpi_db[4])
                    kpiDescriptor.service_id.service_uuid.uuid          = str(kpi_db[5])
                    kpiDescriptor.slice_id.slice_uuid.uuid              = str(kpi_db[6])
                    kpiDescriptor.connection_id.connection_uuid.uuid    = str(kpi_db[7])
    
                LOGGER.exception('GetKpiDescriptor exception')
    
            except Exception:  # pragma: no cover
                LOGGER.exception('GetKpiDescriptor exception')
    
        def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList:
    
            LOGGER.info('GetKpiDescriptorList')
    
                kpi_descriptor_list = KpiDescriptorList()
    
                data = self.management_db.get_KPIS()
                LOGGER.debug(f"data: {data}")
    
                for item in data:
                    kpi_descriptor = KpiDescriptor()
    
                    kpi_descriptor.kpi_id.kpi_id.uuid                   = str(item[0])
                    kpi_descriptor.kpi_description                      = item[1]
                    kpi_descriptor.kpi_sample_type                      = item[2]
                    kpi_descriptor.device_id.device_uuid.uuid           = str(item[3])
                    kpi_descriptor.endpoint_id.endpoint_uuid.uuid       = str(item[4])
                    kpi_descriptor.service_id.service_uuid.uuid         = str(item[5])
                    kpi_descriptor.slice_id.slice_uuid.uuid             = str(item[6])
                    kpi_descriptor.connection_id.connection_uuid.uuid   = str(item[7])
    
                    kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor)
    
            except ServiceException as e:
    
                LOGGER.exception('GetKpiDescriptorList exception')
    
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
    
                LOGGER.exception('GetKpiDescriptorList exception')
    
        def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty:
    
    
            LOGGER.info('IncludeKpi')
    
    
                kpi_id = request.kpi_id.kpi_id.uuid
    
    Javi Moreno's avatar
     
    Javi Moreno committed
                kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
    
                if kpiDescriptor is None:
                    LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                else:
                    kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
                    kpiId = kpi_id
                    deviceId = kpiDescriptor.device_id.device_uuid.uuid
                    endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
                    serviceId = kpiDescriptor.service_id.service_uuid.uuid
    
                    sliceId   = kpiDescriptor.slice_id.slice_uuid.uuid
    
                    connectionId = kpiDescriptor.connection_id.connection_uuid.uuid
    
                    time_stamp = request.timestamp.timestamp
                    kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
    
                    # Build the structure to be included as point in the MetricsDB
    
                    self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value)
    
            except ServiceException as e:
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except Exception:  # pragma: no cover
    
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
    
        def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty:
    
    Javi Moreno's avatar
     
    Javi Moreno committed
            try:
    
                kpi_id = int(request.kpi_id.kpi_id.uuid)
                kpi = self.management_db.get_KPI(kpi_id)
                response = Empty()
    
                if kpi:
                    # Sets the request to send to the device service
                    monitor_device_request = MonitoringSettings()
    
                    kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
    
                    monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
                    monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
                    monitor_device_request.sampling_duration_s = request.monitoring_window_s
                    monitor_device_request.sampling_interval_s = request.sampling_rate_s
    
    
                    if not self.management_db.check_monitoring_flag(kpi_id):
                        device_client = DeviceClient()
                        device_client.MonitorDeviceKpi(monitor_device_request)
                        self.management_db.set_monitoring_flag(kpi_id,True)
                        self.management_db.check_monitoring_flag(kpi_id)
                    else:
                        LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id)))
    
                else:
                    LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                return response
    
    Javi Moreno's avatar
     
    Javi Moreno committed
            except ServiceException as e:
    
                LOGGER.exception('MonitorKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
    
    Javi Moreno's avatar
     
    Javi Moreno committed
                grpc_context.abort(e.code, e.details)
    
            except Exception as e:  # pragma: no cover
                LOGGER.exception('MonitorKpi exception')
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
                # CREATEKPI_COUNTER_FAILED.inc()
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
        def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable:
    
                raw_kpi_table = RawKpiTable()
    
                LOGGER.debug(str(request))
    
    
                kpi_id_list             = request.kpi_ids
    
                monitoring_window_s     = request.monitoring_window_s
                last_n_samples          = request.last_n_samples
                start_timestamp         = request.start_timestamp.timestamp
                end_timestamp           = request.end_timestamp.timestamp
    
                # Check if all the Kpi_ids exist
                for item in kpi_id_list:
                    kpi_id = item.kpi_id.uuid
    
                    kpiDescriptor = self.GetKpiDescriptor(item, grpc_context)
                    if kpiDescriptor is None:
                        LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                        break
                    else:
                        # Execute query per Kpi_id and introduce their kpi_list in the table
    
                        kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp)
    
                        raw_kpi_list = RawKpiList()
                        raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id
    
                        LOGGER.debug(str(kpi_list))
    
                        if kpi_list is None:
                            LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id)))
                        else:
                            for item in kpi_list:
                                raw_kpi = RawKpi()
    
                                raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0])
                                raw_kpi.kpi_value.floatVal  = item[1]
                                raw_kpi_list.raw_kpis.append(raw_kpi)
    
                        raw_kpi_table.raw_kpi_lists.append(raw_kpi_list)
    
            except ServiceException as e:
                LOGGER.exception('QueryKpiData exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('QueryKpiData exception')
    
    
        def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse:
    
                subs_queue = Queue()
    
                kpi_id = request.kpi_id.kpi_id.uuid
                sampling_duration_s = request.sampling_duration_s
                sampling_interval_s = request.sampling_interval_s
                start_timestamp = request.start_timestamp.timestamp
                end_timestamp = request.end_timestamp.timestamp
    
                subscriber = "localhost"  # Investigate how to get info from the requester
    
                subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s,
                                                                 sampling_interval_s, start_timestamp, end_timestamp)
                self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s,
                                                      start_timestamp, end_timestamp)
    
                # parse queue to append kpis into the list
    
                        list = subs_queue.get_nowait()
                        for item in list:
                            kpi = Kpi()
                            kpi.kpi_id.kpi_id.uuid = str(item[0])
                            kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                            kpi.kpi_value.floatVal = item[2]  # This must be improved
                            subs_response.kpi_list.kpi.append(kpi)
    
                        subs_response.subs_id.subs_id.uuid = str(subs_id)
                        yield subs_response
    
            except ServiceException as e:
                LOGGER.exception('SubscribeKpi exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('SubscribeKpi exception')
    
    
        def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor:
    
                subs_id = request.subs_id.uuid
                subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
                response = SubsDescriptor()
                if subs_db is None:
                    LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id)))
                else:
                    LOGGER.debug(subs_db)
                    response.subs_id.subs_id.uuid = str(subs_db[0])
                    response.kpi_id.kpi_id.uuid = str(subs_db[1])
                    response.sampling_duration_s = subs_db[3]
                    response.sampling_interval_s = subs_db[4]
                    response.start_timestamp.timestamp = subs_db[5]
                    response.end_timestamp.timestamp = subs_db[6]
    
                return response
    
            except ServiceException as e:
                LOGGER.exception('GetSubsDescriptor exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetSubsDescriptor exception')
    
    
        def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList:
    
                response = SubsList()
                data = self.management_db.get_subscriptions()
    
                for subs_db in data:
                    subs_descriptor = SubsDescriptor()
    
                    subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0])
                    subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1])
                    subs_descriptor.sampling_duration_s = subs_db[3]
                    subs_descriptor.sampling_interval_s = subs_db[4]
                    subs_descriptor.start_timestamp.timestamp = subs_db[5]
                    subs_descriptor.end_timestamp.timestamp = subs_db[6]
    
                    response.subs_descriptor.append(subs_descriptor)
    
                return response
    
            except ServiceException as e:
                LOGGER.exception('GetSubscriptions exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetSubscriptions exception')
    
    
        def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty:
    
    
            LOGGER.info('DeleteSubscription')
            try:
    
                LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}')
                subs_id = int(request.subs_id.uuid)
                subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
                if subs_db:
                    self.management_db.delete_subscription(subs_id)
                else:
                    LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id)))
    
                return Empty()
            except ServiceException as e:
                LOGGER.exception('DeleteSubscription exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteSubscription exception')
    
    
        def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse:
    
                response = AlarmID()
    
                alarm_description = request.alarm_description
                alarm_name = request.name
                kpi_id = request.kpi_id.kpi_id.uuid
                kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal
                kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal
                in_range = request.kpi_value_range.inRange
                include_min_value = request.kpi_value_range.includeMinValue
                include_max_value = request.kpi_value_range.includeMaxValue
                timestamp = request.timestamp.timestamp
    
                LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}")
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if request.alarm_id.alarm_id.uuid != "":
    
                    alarm_id = request.alarm_id.alarm_id.uuid
                #     Here the code to modify an existing alarm
                else:
                    alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value,
                                                               kpi_max_value,
                                                               in_range, include_min_value, include_max_value)
                    LOGGER.debug(f"AlarmID: {alarm_id}")
                response.alarm_id.uuid = str(alarm_id)
    
                return response
    
                LOGGER.exception('SetKpiAlarm exception')
    
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
    
                LOGGER.exception('SetKpiAlarm exception')
    
        def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList:
    
                response = AlarmList()
                data = self.management_db.get_alarms()
    
                for alarm in data:
                    alarm_descriptor = AlarmDescriptor()
    
                    alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0])
                    alarm_descriptor.alarm_description = alarm[1]
                    alarm_descriptor.name = alarm[2]
                    alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3])
                    alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4]
                    alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
                    alarm_descriptor.kpi_value_range.inRange = bool(alarm[6])
                    alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7])
                    alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8])
    
                    response.alarm_descriptor.append(alarm_descriptor)
    
                return response
    
            except ServiceException as e:
                LOGGER.exception('GetAlarms exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetAlarms exception')
    
    
        def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor:
    
                alarm_id = request.alarm_id.uuid
    
                alarm = self.management_db.get_alarm(alarm_id)
                response = AlarmDescriptor()
    
                if alarm:
                    LOGGER.debug(f"{alarm}")
                    response.alarm_id.alarm_id.uuid = str(alarm_id)
                    response.alarm_description = alarm[1]
                    response.name = alarm[2]
                    response.kpi_id.kpi_id.uuid = str(alarm[3])
                    response.kpi_value_range.kpiMinValue.floatVal = alarm[4]
                    response.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
                    response.kpi_value_range.inRange = bool(alarm[6])
                    response.kpi_value_range.includeMinValue = bool(alarm[7])
                    response.kpi_value_range.includeMaxValue = bool(alarm[8])
                else:
                    LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
                    response.alarm_id.alarm_id.uuid = "NoID"
                return response
    
            except ServiceException as e:
                LOGGER.exception('GetAlarmDescriptor exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
    
                LOGGER.exception('GetAlarmDescriptor exception')
    
    
        def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[
            AlarmResponse]:
    
    
            LOGGER.info('GetAlarmResponseStream')
            try:
    
                alarm_id = request.alarm_id.alarm_id.uuid
    
                real_start_time = timestamp_utcnow_to_float()
    
                    alarm_queue = Queue()
    
                    alarm_id = request.alarm_id.alarm_id.uuid
                    kpi_id = alarm_data[3]
                    kpiMinValue = alarm_data[4]
                    kpiMaxValue = alarm_data[5]
                    inRange = alarm_data[6]
                    includeMinValue = alarm_data[7]
                    includeMaxValue = alarm_data[8]
                    subscription_frequency_ms = request.subscription_frequency_ms
                    subscription_timeout_s = request.subscription_timeout_s
    
    
                    end_timestamp = real_start_time + subscription_timeout_s
    
    
                    self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange,
                                                    includeMinValue, includeMaxValue, subscription_frequency_ms,
                                                    subscription_timeout_s)
    
    
                    while True:
                        while not alarm_queue.empty():
                            alarm_response = AlarmResponse()
                            list = alarm_queue.get_nowait()
                            size = len(list)
                            for item in list:
                                kpi = Kpi()
                                kpi.kpi_id.kpi_id.uuid = str(item[0])
                                kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                                kpi.kpi_value.floatVal = item[2]  # This must be improved
                                alarm_response.kpi_list.kpi.append(kpi)
                            alarm_response.alarm_id.alarm_id.uuid = alarm_id
                            yield alarm_response
                        if timestamp_utcnow_to_float() > end_timestamp:
                            break
    
                else:
                    LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
    
                    alarm_response = AlarmResponse()
    
                    alarm_response.alarm_id.alarm_id.uuid = "NoID"
                    return alarm_response
    
            except ServiceException as e:
                LOGGER.exception('GetAlarmResponseStream exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetAlarmResponseStream exception')
    
        def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty:
    
                LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}')
                alarm_id = int(request.alarm_id.uuid)
                alarm = self.management_db.get_alarm(alarm_id)
                response = Empty()
                if alarm:
                    self.alarm_manager.delete_alarm(alarm_id)
                    self.management_db.delete_alarm(alarm_id)
                else:
                    LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
                return response
    
            except ServiceException as e:
                LOGGER.exception('DeleteAlarm exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteAlarm exception')
    
    
        def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]:
    
            LOGGER.info('GetStreamKpi')
    
            kpi_id = request.kpi_id.uuid
            kpi_db = self.management_db.get_KPI(int(kpi_id))
            response = Kpi()
            if kpi_db is None:
    
                LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
    
                response.kpi_id.kpi_id.uuid = "NoID"
                return response
            else:
                yield response
    
        @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
        def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi:
    
            LOGGER.info('GetInstantKpi')
            try:
                kpi_id = request.kpi_id.uuid
                response = Kpi()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if kpi_id == "":
    
                    LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                    response.kpi_id.kpi_id.uuid = "NoID"
                else:
                    query = f"SELECT kpi_id, timestamp, kpi_value FROM monitoring WHERE kpi_id = '{kpi_id}' " \
                            f"LATEST ON timestamp PARTITION BY kpi_id"
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    data = self.metrics_db.run_query(query)
    
                    if len(data) == 0:
                        response.kpi_id.kpi_id.uuid = request.kpi_id.uuid
                    else:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        _data = data[0]
                        response.kpi_id.kpi_id.uuid = str(_data[0])
                        response.timestamp.timestamp = timestamp_string_to_float(_data[1])
                        response.kpi_value.floatVal = _data[2]
    
    
                return response
            except ServiceException as e:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                LOGGER.exception('GetInstantKpi exception')
    
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                LOGGER.exception('GetInstantKpi exception')
    
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))