Skip to content
Snippets Groups Projects
MonitoringServiceServicerImpl.py 22.1 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, os, grpc
from typing import Iterator
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
from device.client.DeviceClient import DeviceClient
from monitoring.service import ManagementDBTools, MetricsDBTools
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.NameMapping import NameMapping
from monitoring.service.SubscriptionManager import SubscriptionManager

LOGGER = logging.getLogger(__name__)
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE_MONITORING_KPIS = os.environ.get("METRICSDB_TABLE_MONITORING_KPIS")
METRICS_POOL = MetricsPool('Monitoring', 'RPC')

class MonitoringServiceServicerImpl(MonitoringServiceServicer):
    def __init__(self, name_mapping : NameMapping):
        LOGGER.info('Init monitoringService')
        # Init sqlite monitoring db
        self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
        self.deviceClient = DeviceClient()
        self.metrics_db = MetricsDBTools.MetricsDB(
            METRICSDB_HOSTNAME, name_mapping, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
        self.subs_manager = SubscriptionManager(self.metrics_db)
        self.alarm_manager = AlarmManager(self.metrics_db)
        LOGGER.info('MetricsDB initialized')
    # SetKpi (SetKpiRequest) returns (KpiId) {}
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
            self, request: KpiDescriptor, grpc_context: grpc.ServicerContext
        response = KpiId()
        kpi_description = request.kpi_description
        kpi_sample_type = request.kpi_sample_type
        kpi_device_id = request.device_id.device_uuid.uuid
        kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
        kpi_service_id = request.service_id.service_uuid.uuid
        kpi_slice_id = request.slice_id.slice_uuid.uuid
        kpi_connection_id = request.connection_id.connection_uuid.uuid
        if request.kpi_id.kpi_id.uuid != "":
            response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
            # Here the code to modify an existing kpi
        else:
            data = self.management_db.insert_KPI(
                kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id,
                kpi_connection_id)
            response.kpi_id.uuid = str(data)
        return response
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty:
        kpi_id = int(request.kpi_id.uuid)
        kpi = self.management_db.get_KPI(kpi_id)
        if kpi:
            self.management_db.delete_KPI(kpi_id)
        else:
            LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        return Empty()
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
        kpi_id = request.kpi_id.uuid
        kpi_db = self.management_db.get_KPI(int(kpi_id))
        kpiDescriptor = KpiDescriptor()
        if kpi_db is None:
            LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        else:
            kpiDescriptor.kpi_description                       = kpi_db[1]
            kpiDescriptor.kpi_sample_type                       = kpi_db[2]
            kpiDescriptor.device_id.device_uuid.uuid            = str(kpi_db[3])
            kpiDescriptor.endpoint_id.endpoint_uuid.uuid        = str(kpi_db[4])
            kpiDescriptor.service_id.service_uuid.uuid          = str(kpi_db[5])
            kpiDescriptor.slice_id.slice_uuid.uuid              = str(kpi_db[6])
            kpiDescriptor.connection_id.connection_uuid.uuid    = str(kpi_db[7])
        return kpiDescriptor

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList:
        kpi_descriptor_list = KpiDescriptorList()
        data = self.management_db.get_KPIS()
        LOGGER.debug(f"data: {data}")
        for item in data:
            kpi_descriptor = KpiDescriptor()
            kpi_descriptor.kpi_id.kpi_id.uuid                   = str(item[0])
            kpi_descriptor.kpi_description                      = item[1]
            kpi_descriptor.kpi_sample_type                      = item[2]
            kpi_descriptor.device_id.device_uuid.uuid           = str(item[3])
            kpi_descriptor.endpoint_id.endpoint_uuid.uuid       = str(item[4])
            kpi_descriptor.service_id.service_uuid.uuid         = str(item[5])
            kpi_descriptor.slice_id.slice_uuid.uuid             = str(item[6])
            kpi_descriptor.connection_id.connection_uuid.uuid   = str(item[7])
            kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor)
        return kpi_descriptor_list

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty:
        kpi_id = request.kpi_id.kpi_id.uuid
        kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
        if kpiDescriptor is None:
            LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        else:
            kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
            kpiId = kpi_id
            deviceId = kpiDescriptor.device_id.device_uuid.uuid
            endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
            serviceId = kpiDescriptor.service_id.service_uuid.uuid
            sliceId   = kpiDescriptor.slice_id.slice_uuid.uuid
            connectionId = kpiDescriptor.connection_id.connection_uuid.uuid
            time_stamp = request.timestamp.timestamp
            kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))

            # Build the structure to be included as point in the MetricsDB
            self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value)
        return Empty()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty:
        kpi_id = int(request.kpi_id.kpi_id.uuid)
        kpi = self.management_db.get_KPI(kpi_id)
        response = Empty()
        if kpi:
            # Sets the request to send to the device service
            monitor_device_request = MonitoringSettings()
Javi Moreno's avatar
 
Javi Moreno committed
            kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
            monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
            monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
            monitor_device_request.sampling_duration_s = request.monitoring_window_s
            monitor_device_request.sampling_interval_s = request.sampling_rate_s
            if not self.management_db.check_monitoring_flag(kpi_id):
                device_client = DeviceClient()
                device_client.MonitorDeviceKpi(monitor_device_request)
                self.management_db.set_monitoring_flag(kpi_id,True)
                self.management_db.check_monitoring_flag(kpi_id)
            else:
                LOGGER.warning('MonitorKpi warning: KpiID({:s}) is currently being monitored'.format(str(kpi_id)))
        else:
            LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        return response
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable:
        raw_kpi_table = RawKpiTable()
        kpi_id_list             = request.kpi_ids
        monitoring_window_s     = request.monitoring_window_s
        last_n_samples          = request.last_n_samples
        start_timestamp         = request.start_timestamp.timestamp
        end_timestamp           = request.end_timestamp.timestamp

        # Check if all the Kpi_ids exist
        for item in kpi_id_list:
            kpi_id = item.kpi_id.uuid
            kpiDescriptor = self.GetKpiDescriptor(item, grpc_context)
                LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id)))
                break
                # Execute query per Kpi_id and introduce their kpi_list in the table
                kpi_list = self.metrics_db.get_raw_kpi_list(kpi_id,monitoring_window_s,last_n_samples,start_timestamp,end_timestamp)
                raw_kpi_list = RawKpiList()
                raw_kpi_list.kpi_id.kpi_id.uuid = kpi_id
                LOGGER.debug(str(kpi_list))
                if kpi_list is None:
                    LOGGER.info('QueryKpiData error: KpiID({:s}): points not found in metrics database'.format(str(kpi_id)))
                    for item in kpi_list:
                        raw_kpi = RawKpi()
                        raw_kpi.timestamp.timestamp = timestamp_string_to_float(item[0])
                        raw_kpi.kpi_value.floatVal  = item[1]
                        raw_kpi_list.raw_kpis.append(raw_kpi)
Javi Moreno's avatar
 
Javi Moreno committed

                raw_kpi_table.raw_kpi_lists.append(raw_kpi_list)
        return raw_kpi_table
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse:
        subs_queue = Queue()

        kpi_id = request.kpi_id.kpi_id.uuid
        sampling_duration_s = request.sampling_duration_s
        sampling_interval_s = request.sampling_interval_s
        start_timestamp = request.start_timestamp.timestamp
        end_timestamp = request.end_timestamp.timestamp

        subscriber = "localhost"  # Investigate how to get info from the requester

        subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s,
                                                            sampling_interval_s, start_timestamp, end_timestamp)
        self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s,
                                                start_timestamp, end_timestamp)

        # parse queue to append kpis into the list
        while True:
            while not subs_queue.empty():
                subs_response = SubsResponse()
                list = subs_queue.get_nowait()
                for item in list:
                    kpi = Kpi()
                    kpi.kpi_id.kpi_id.uuid = str(item[0])
                    kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                    kpi.kpi_value.floatVal = item[2]  # This must be improved
                    subs_response.kpi_list.kpi.append(kpi)
                subs_response.subs_id.subs_id.uuid = str(subs_id)
                yield subs_response
            if timestamp_utcnow_to_float() > end_timestamp:
                break
        # yield subs_response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor:
        subs_id = request.subs_id.uuid
        subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
        response = SubsDescriptor()
        if subs_db is None:
            LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id)))
        else:
            LOGGER.debug(subs_db)
            response.subs_id.subs_id.uuid = str(subs_db[0])
            response.kpi_id.kpi_id.uuid = str(subs_db[1])
            response.sampling_duration_s = subs_db[3]
            response.sampling_interval_s = subs_db[4]
            response.start_timestamp.timestamp = subs_db[5]
            response.end_timestamp.timestamp = subs_db[6]
        return response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList:
        response = SubsList()
        data = self.management_db.get_subscriptions()
        for subs_db in data:
            subs_descriptor = SubsDescriptor()
            subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0])
            subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1])
            subs_descriptor.sampling_duration_s = subs_db[3]
            subs_descriptor.sampling_interval_s = subs_db[4]
            subs_descriptor.start_timestamp.timestamp = subs_db[5]
            subs_descriptor.end_timestamp.timestamp = subs_db[6]
            response.subs_descriptor.append(subs_descriptor)
        return response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty:
        subs_id = int(request.subs_id.uuid)
        subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
        if subs_db:
            self.management_db.delete_subscription(subs_id)
        else:
            LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id)))
        return Empty()
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse:
        response = AlarmID()
        alarm_description = request.alarm_description
        alarm_name = request.name
        kpi_id = request.kpi_id.kpi_id.uuid
        kpi_min_value = float(request.kpi_value_range.kpiMinValue.floatVal)
        kpi_max_value = float(request.kpi_value_range.kpiMaxValue.floatVal)
        in_range = request.kpi_value_range.inRange
        include_min_value = request.kpi_value_range.includeMinValue
        include_max_value = request.kpi_value_range.includeMaxValue
        timestamp = request.timestamp.timestamp
        LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}")
        if request.alarm_id.alarm_id.uuid != "":
            alarm_id = request.alarm_id.alarm_id.uuid
            # Here the code to modify an existing alarm
        else:
            alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value,
                                                        kpi_max_value,
                                                        in_range, include_min_value, include_max_value)
            LOGGER.debug(f"AlarmID: {alarm_id}")
        response.alarm_id.uuid = str(alarm_id)
        return response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList:
        response = AlarmList()
        data = self.management_db.get_alarms()
        for alarm in data:
            alarm_descriptor = AlarmDescriptor()
            alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0])
            alarm_descriptor.alarm_description = alarm[1]
            alarm_descriptor.name = alarm[2]
            alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3])
            alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4]
            alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
            alarm_descriptor.kpi_value_range.inRange = bool(alarm[6])
            alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7])
            alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8])
            response.alarm_descriptor.append(alarm_descriptor)
        return response
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor:
        alarm_id = request.alarm_id.uuid
        LOGGER.debug(alarm_id)
        alarm = self.management_db.get_alarm(alarm_id)
        response = AlarmDescriptor()

        if alarm:
            LOGGER.debug(f"{alarm}")
            response.alarm_id.alarm_id.uuid = str(alarm_id)
            response.alarm_description = alarm[1]
            response.name = alarm[2]
            response.kpi_id.kpi_id.uuid = str(alarm[3])
            response.kpi_value_range.kpiMinValue.floatVal = alarm[4]
            response.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
            response.kpi_value_range.inRange = bool(alarm[6])
            response.kpi_value_range.includeMinValue = bool(alarm[7])
            response.kpi_value_range.includeMaxValue = bool(alarm[8])
        else:
            LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
            response.alarm_id.alarm_id.uuid = "NoID"
        return response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarmResponseStream(
        self, request: AlarmSubscription, grpc_context: grpc.ServicerContext
    ) -> Iterator[AlarmResponse]:
        alarm_id = request.alarm_id.alarm_id.uuid
        alarm_data = self.management_db.get_alarm(alarm_id)
        real_start_time = timestamp_utcnow_to_float()

        if alarm_data:
            LOGGER.debug(f"{alarm_data}")
            alarm_queue = Queue()
            alarm_id = request.alarm_id.alarm_id.uuid
            kpi_id = alarm_data[3]
            kpiMinValue = alarm_data[4]
            kpiMaxValue = alarm_data[5]
            inRange = alarm_data[6]
            includeMinValue = alarm_data[7]
            includeMaxValue = alarm_data[8]
            subscription_frequency_ms = request.subscription_frequency_ms
            subscription_timeout_s = request.subscription_timeout_s
            end_timestamp = real_start_time + subscription_timeout_s
            self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange,
                                            includeMinValue, includeMaxValue, subscription_frequency_ms,
                                            subscription_timeout_s)
                while not alarm_queue.empty():
                    alarm_response = AlarmResponse()
                    list = alarm_queue.get_nowait()
                    size = len(list)
                    for item in list:
                        kpi = Kpi()
                        kpi.kpi_id.kpi_id.uuid = str(item[0])
                        kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                        kpi.kpi_value.floatVal = item[2]  # This must be improved
                        alarm_response.kpi_list.kpi.append(kpi)
                    alarm_response.alarm_id.alarm_id.uuid = alarm_id
                    yield alarm_response
        else:
            LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
            alarm_response = AlarmResponse()
            alarm_response.alarm_id.alarm_id.uuid = "NoID"
            return alarm_response
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty:
        alarm_id = int(request.alarm_id.uuid)
        alarm = self.management_db.get_alarm(alarm_id)
        response = Empty()
        if alarm:
            self.alarm_manager.delete_alarm(alarm_id)
            self.management_db.delete_alarm(alarm_id)
        else:
            LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
        return response
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]:
        kpi_id = request.kpi_id.uuid
        kpi_db = self.management_db.get_KPI(int(kpi_id))
        response = Kpi()
        if kpi_db is None:
            LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
            response.kpi_id.kpi_id.uuid = "NoID"
            return response
        else:
            yield response

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi:
        kpi_id = request.kpi_id.uuid
        response = Kpi()
        if kpi_id == "":
            LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
            response.kpi_id.kpi_id.uuid = "NoID"
        else:
            query = f"SELECT kpi_id, timestamp, kpi_value FROM {METRICSDB_TABLE_MONITORING_KPIS} " \
                    f"WHERE kpi_id = '{kpi_id}' LATEST ON timestamp PARTITION BY kpi_id"
            data = self.metrics_db.run_query(query)
            LOGGER.debug(data)
            if len(data) == 0:
                response.kpi_id.kpi_id.uuid = request.kpi_id.uuid
                _data = data[0]
                response.kpi_id.kpi_id.uuid = str(_data[0])
                response.timestamp.timestamp = timestamp_string_to_float(_data[1])
                response.kpi_value.floatVal = _data[2]
        return response