Skip to content
Snippets Groups Projects
MonitoringServiceServicerImpl.py 15.5 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os, grpc, logging
from queue import Queue
from typing import Iterator
Javi Moreno's avatar
 
Javi Moreno committed

from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float
Javi Moreno's avatar
 
Javi Moreno committed

from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient

from prometheus_client import Counter, Summary
Javi Moreno's avatar
 
Javi Moreno committed

from monitoring.service.SubscriptionManager import SubscriptionManager

LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('INFO')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
    'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')

METRICSDB_HOSTNAME  = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT  = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE     = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST',      default=get_service_host(ServiceNameEnum.DEVICE)     )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer):
    def __init__(self):
        LOGGER.info('Init monitoringService')
        # Init sqlite monitoring db
        self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
        self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC)  # instantiate the client
        self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
        self.subs_manager = SubscriptionManager(self.metrics_db)
        LOGGER.info('MetricsDB initialized')
    # SetKpi (SetKpiRequest) returns (KpiId) {}
    def SetKpi(
        self, request : KpiDescriptor, grpc_context : grpc.ServicerContext
    ) -> KpiId:
        # CREATEKPI_COUNTER_STARTED.inc()
        try:
            # Here the code to create a sqlite query to crete a KPI and return a KpiID

            kpi_description = request.kpi_description
            kpi_sample_type = request.kpi_sample_type
            kpi_device_id   = request.device_id.device_uuid.uuid
            kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
            kpi_service_id  = request.service_id.service_uuid.uuid

            data = self.management_db.insert_KPI(
                kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
            kpi_id.kpi_id.uuid = str(data)
            # CREATEKPI_COUNTER_COMPLETED.inc()
            return kpi_id
        except ServiceException as e:
            LOGGER.exception('SetKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SetKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
    def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty:
        except ServiceException as e:
            LOGGER.exception('DeleteKpi exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteKpi exception')

    def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
        LOGGER.info('getting Kpi by KpiID')
        try:
            kpi_db = self.management_db.get_KPI(int(request.kpi_id.uuid))
            # LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
            # LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
            if kpi_db is None: return None

            kpiDescriptor = KpiDescriptor()

            kpiDescriptor.kpi_description = kpi_db[1]
            kpiDescriptor.kpi_sample_type = kpi_db[2]
            kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
            kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
            kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])

            return kpiDescriptor
        except ServiceException as e:
            LOGGER.exception('GetKpiDescriptor exception')
            grpc_context.abort(e.code, e.details)

        except Exception:  # pragma: no cover
            LOGGER.exception('GetKpiDescriptor exception')

    def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList:
            return KpiDescriptorList()
        except ServiceException as e:
            LOGGER.exception('GetKpiDescriptorList exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetKpiDescriptorList exception')



    # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
    def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty:

        LOGGER.info('IncludeKpi')

Javi Moreno's avatar
 
Javi Moreno committed
            kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
            if kpiDescriptor is None:
                raise Exception(LOGGER.exception('IncludeKpi exception: Sample with KPIId({:s}): not found in database'.format(str(request.kpi_id))))
            else:
                kpiSampleType   = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
                kpiId           = request.kpi_id.kpi_id.uuid
                deviceId        = kpiDescriptor.device_id.device_uuid.uuid
                endpointId      = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
                serviceId       = kpiDescriptor.service_id.service_uuid.uuid
                time_stamp      = request.timestamp.timestamp
                kpi_value       = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))

                # Build the structure to be included as point in the MetricsDB
                self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)

                #self.influx_db.read_KPI_points()
        except ServiceException as e:
            LOGGER.exception('IncludeKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception:  # pragma: no cover
            LOGGER.exception('IncludeKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
    # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
    def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty:
Javi Moreno's avatar
 
Javi Moreno committed

Javi Moreno's avatar
 
Javi Moreno committed
        try:
            # Sets the request to send to the device service
            monitor_device_request = MonitoringSettings()
Javi Moreno's avatar
 
Javi Moreno committed

            kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)

            monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
            monitor_device_request.kpi_id.kpi_id.uuid  = request.kpi_id.kpi_id.uuid
            monitor_device_request.sampling_duration_s = request.monitoring_window_s
            monitor_device_request.sampling_interval_s = request.sampling_rate_s
Javi Moreno's avatar
 
Javi Moreno committed

            device_client = DeviceClient()
            device_client.MonitorDeviceKpi(monitor_device_request)
Javi Moreno's avatar
 
Javi Moreno committed

        except ServiceException as e:
            LOGGER.exception('MonitorKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
Javi Moreno's avatar
 
Javi Moreno committed
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('MonitorKpi exception')
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
            # CREATEKPI_COUNTER_FAILED.inc()

        return Empty()
Javi Moreno's avatar
 
Javi Moreno committed

    def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList:
        except ServiceException as e:
            LOGGER.exception('QueryKpiData exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('QueryKpiData exception')

    def SetKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> SubsResponse:

            subs_queue = Queue()
            subs_response = SubsResponse()

            kpi_id              = request.kpi_id.kpi_id.uuid
            sampling_duration_s = request.sampling_duration_s
            sampling_interval_s = request.sampling_interval_s
            start_timestamp     = request.start_timestamp.timestamp
            end_timestamp       = request.end_timestamp.timestamp

            subscriber = "localhost" # Investigate how to get info from the requester

            subs_id =  self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp)
            self.subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)

            # parse queue to append kpis into the list
            while not subs_queue.empty():
                list = subs_queue.get_nowait()
                for item in list:
                    kpi = kpi.kpi_id.kpi_id.uuid    = item[0]
                    kpi.timestamp.timestamp         = timestamp_string_to_float(item[1])
                    kpi.kpi_value.floatVal          = item[2]    # This must be improved
                    subs_response.kpi_list.kpi.append(kpi)

            subs_response.subs_id.subs_id.uuid = subs_id

            yield subs_response
        except ServiceException as e:
            LOGGER.exception('SubscribeKpi exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SubscribeKpi exception')


    def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor:
            return SubsDescriptor()
        except ServiceException as e:
            LOGGER.exception('GetSubsDescriptor exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetSubsDescriptor exception')

    def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsList:
        except ServiceException as e:
            LOGGER.exception('GetSubscriptions exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetSubscriptions exception')

    def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty:

        LOGGER.info('DeleteSubscription')
        try:
             # TBC
            return Empty()
        except ServiceException as e:
            LOGGER.exception('DeleteSubscription exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteSubscription exception')

    def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse:
            return AlarmResponse()
            LOGGER.exception('SetKpiAlarm exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SetKpiAlarm exception')
    def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmList:
        except ServiceException as e:
            LOGGER.exception('GetAlarms exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarms exception')

    def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor:
            return AlarmDescriptor()
        except ServiceException as e:
            LOGGER.exception('GetAlarmDescriptor exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarmDescriptor exception')

    def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]:

        LOGGER.info('GetAlarmResponseStream')
        try:
            # TBC
            yield AlarmResponse()
        except ServiceException as e:
            LOGGER.exception('GetAlarmResponseStream exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarmResponseStream exception')

    def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty:

        LOGGER.info('DeleteAlarm')
        try:
             # TBC
            return Empty()
        except ServiceException as e:
            LOGGER.exception('DeleteAlarm exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteAlarm exception')

    def GetStreamKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Iterator[Kpi]:

        LOGGER.info('GetStreamKpi')
        yield Kpi()

    @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    def GetInstantKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiList:

        LOGGER.info('GetInstantKpi')
        return KpiList()