Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • ccdd5bd89282cb199465f70b8a4331a1743ef164
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • cnit_tapi
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/307-update-python-version-service
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

ContextServiceServicerImpl.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    MonitoringServiceServicerImpl.py 13.90 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import os, grpc, logging
    
    from typing import Iterator
    
    from common.Constants import ServiceNameEnum
    from common.Settings import get_setting, get_service_port_grpc, get_service_host
    from common.logger import getJSONLogger
    from common.proto.context_pb2 import Empty
    from common.proto.device_pb2 import MonitoringSettings
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
    from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
        KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
        MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
    from common.rpc_method_wrapper.ServiceExceptions import ServiceException
    
    from monitoring.service import ManagementDBTools, MetricsDBTools
    from device.client.DeviceClient import DeviceClient
    
    from prometheus_client import Counter, Summary
    
    LOGGER = getJSONLogger('monitoringservice-server')
    LOGGER.setLevel('DEBUG')
    
    MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
        'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
    MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
    
    METRICSDB_HOSTNAME  = os.environ.get("METRICSDB_HOSTNAME")
    METRICSDB_ILP_PORT  = os.environ.get("METRICSDB_ILP_PORT")
    METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
    METRICSDB_TABLE     = os.environ.get("METRICSDB_TABLE")
    
    
    DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST',      default=get_service_host(ServiceNameEnum.DEVICE)     )
    DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
    
    
    class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        def __init__(self):
            LOGGER.info('Init monitoringService')
    
            # Init sqlite monitoring db
            self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
            self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC)  # instantiate the client
    
            self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
            LOGGER.info('MetricsDB initialized')
    
        # SetKpi (SetKpiRequest) returns (KpiId) {}
        def SetKpi(
            self, request : KpiDescriptor, grpc_context : grpc.ServicerContext
        ) -> KpiId:
            # CREATEKPI_COUNTER_STARTED.inc()
            LOGGER.info('SetKpi')
            try:
                # Here the code to create a sqlite query to crete a KPI and return a KpiID
                kpi_id = KpiId()
    
                kpi_description = request.kpi_description
                kpi_sample_type = request.kpi_sample_type
                kpi_device_id   = request.device_id.device_uuid.uuid
                kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
                kpi_service_id  = request.service_id.service_uuid.uuid
    
                data = self.management_db.insert_KPI(
                    kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    
                kpi_id.kpi_id.uuid = str(data)
                # CREATEKPI_COUNTER_COMPLETED.inc()
                return kpi_id
            except ServiceException as e:
                LOGGER.exception('SetKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('SetKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
    
        def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty:
    
            LOGGER.info('DeleteKpi')
            try:
                 # TBC
                return Empty()
            except ServiceException as e:
                LOGGER.exception('DeleteKpi exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteKpi exception')
    
        def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
            LOGGER.info('getting Kpi by KpiID')
            try:
                kpi_db = self.management_db.get_KPI(int(request.kpi_id.uuid))
                # LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
                # LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
                if kpi_db is None: return None
    
                kpiDescriptor = KpiDescriptor()
    
                kpiDescriptor.kpi_description = kpi_db[1]
                kpiDescriptor.kpi_sample_type = kpi_db[2]
                kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
                kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
                kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
    
                return kpiDescriptor
            except ServiceException as e:
                LOGGER.exception('GetKpiDescriptor exception')
                grpc_context.abort(e.code, e.details)
    
            except Exception:  # pragma: no cover
                LOGGER.exception('GetKpiDescriptor exception')
    
        def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList:
    
            LOGGER.info('GetKpiDescriptorList')
            try:
                 # TBC
                return KpiDescriptorList()
            except ServiceException as e:
                LOGGER.exception('GetKpiDescriptorList exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetKpiDescriptorList exception')
    
    
    
        # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
        def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty:
    
            LOGGER.info('IncludeKpi')
    
            try:
                kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
                if kpiDescriptor is None:
                    LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))
                    return Empty()
    
                kpiSampleType   = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
                kpiId           = request.kpi_id.kpi_id.uuid
                deviceId        = kpiDescriptor.device_id.device_uuid.uuid
                endpointId      = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
                serviceId       = kpiDescriptor.service_id.service_uuid.uuid
                time_stamp      = request.timestamp.timestamp
                kpi_value       = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
    
                # Build the structure to be included as point in the MetricsDB
                self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
    
                #self.influx_db.read_KPI_points()
    
            except ServiceException as e:
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception:  # pragma: no cover
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
            return Empty()
    
        # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
        def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty:
    
            LOGGER.info('MonitorKpi')
            try:
                # Sets the request to send to the device service
                monitor_device_request = MonitoringSettings()
    
                kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
    
                monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
                monitor_device_request.kpi_id.kpi_id.uuid  = request.kpi_id.kpi_id.uuid
                monitor_device_request.sampling_duration_s = request.monitoring_window_s
                monitor_device_request.sampling_interval_s = request.sampling_rate_s
    
                device_client = DeviceClient()
                device_client.MonitorDeviceKpi(monitor_device_request)
    
            except ServiceException as e:
                LOGGER.exception('MonitorKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('MonitorKpi exception')
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
                # CREATEKPI_COUNTER_FAILED.inc()
    
            return Empty()
    
    
        def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList:
    
            LOGGER.info('QueryKpiData')
            try:
                 # TBC
                return KpiList()
            except ServiceException as e:
                LOGGER.exception('QueryKpiData exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('QueryKpiData exception')
    
        def SetKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> SubsResponse:
    
            LOGGER.info('SubscribeKpi')
            try:
                 # TBC
                yield SubsResponse()
            except ServiceException as e:
                LOGGER.exception('SubscribeKpi exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('SubscribeKpi exception')
    
    
        def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor:
    
            LOGGER.info('GetSubsDescriptor')
            try:
                 # TBC
                return SubsDescriptor()
            except ServiceException as e:
                LOGGER.exception('GetSubsDescriptor exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetSubsDescriptor exception')
    
        def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsList:
    
            LOGGER.info('GetSubscriptions')
            try:
                 # TBC
                return SubsList()
            except ServiceException as e:
                LOGGER.exception('GetSubscriptions exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetSubscriptions exception')
    
        def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty:
    
            LOGGER.info('DeleteSubscription')
            try:
                 # TBC
                return Empty()
            except ServiceException as e:
                LOGGER.exception('DeleteSubscription exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteSubscription exception')
    
        def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse:
    
            LOGGER.info('SetKpiAlarm')
            try:
                 # TBC
                return AlarmResponse()
            except ServiceException as e:
                LOGGER.exception('SetKpiAlarm exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('SetKpiAlarm exception')
    
    
        def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmList:
    
            LOGGER.info('GetAlarms')
            try:
                 # TBC
                return AlarmList()
            except ServiceException as e:
                LOGGER.exception('GetAlarms exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetAlarms exception')
    
        def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor:
    
            LOGGER.info('GetAlarmDescriptor')
            try:
                 # TBC
                return AlarmDescriptor()
            except ServiceException as e:
                LOGGER.exception('GetAlarmDescriptor exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetAlarmDescriptor exception')
    
        def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]:
    
            LOGGER.info('GetAlarmResponseStream')
            try:
                # TBC
                yield AlarmResponse()
            except ServiceException as e:
                LOGGER.exception('GetAlarmResponseStream exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetAlarmResponseStream exception')
    
        def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty:
    
            LOGGER.info('DeleteAlarm')
            try:
                 # TBC
                return Empty()
            except ServiceException as e:
                LOGGER.exception('DeleteAlarm exception')
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('DeleteAlarm exception')
    
        def GetStreamKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Iterator[Kpi]:
    
            LOGGER.info('GetStreamKpi')
            yield Kpi()
    
        @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
        def GetInstantKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiList:
    
            LOGGER.info('GetInstantKpi')
            return KpiList()