Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • 8f3cfea7938be179e7174de84f38c7bda436d5eb
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • cnit_tapi
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/307-update-python-version-service
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

EndPointModel.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    MonitoringServiceServicerImpl.py 4.34 KiB
    import os
    
    from monitoring.proto import context_pb2
    from monitoring.service import sqlite_tools, influx_tools
    
    from monitoring.proto import monitoring_pb2
    from monitoring.proto import monitoring_pb2_grpc
    
    from common.logger import getJSONLogger
    LOGGER = getJSONLogger('monitoringservice-server')
    LOGGER.setLevel('DEBUG')
    
    from prometheus_client import start_http_server, Summary
    from prometheus_client import Counter, Gauge
    
    MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
    MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
    
    INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
    INFLUXDB_PASS = os.environ.get("INFLUXDB_PASSWORD")
    INFLUXDB_DB = os.environ.get("INFLUXDB_DATABASE")
    
    class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
        def __init__(self):
            LOGGER.info('Init monitoringService')
            # Init sqlite monitoring db
            self.sql_db = sqlite_tools.SQLite('monitoring.db')
            LOGGER.info(INFLUXDB_USER)
            LOGGER.info(INFLUXDB_PASS)
            LOGGER.info(INFLUXDB_DB)
            # Create influx_db client
            self.influx_db = influx_tools.Influx("localhost","8086",INFLUXDB_USER,INFLUXDB_PASS,INFLUXDB_DB)
    
        # CreateKpi (CreateKpiRequest) returns (KpiId) {}
        def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId :
            LOGGER.info('CreateKpi')
    
            # Here the code to create a sqlite query to crete a KPI and return a KpiID
            kpi_id = monitoring_pb2.KpiId()
    
            kpi_description = request.kpiDescription
            kpi_device_id = request.device_id.device_id.uuid
            kpi_sample_type = request.kpi_sample_type
    
            data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type)
            kpi_id.kpi_id.uuid = str(data)
    
            return kpi_id
    
        # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
        def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty:
    
            LOGGER.info('MonitorKpi')
    
            # Creates the request to send to the device service
            monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest()
            kpi = self.get_Kpi(request.kpi_id)
    
            monitor_device_request.kpi.kpi_id.kpi_id.uuid  = kpi.kpi_id.kpi_id.uuid
            monitor_device_request.connexion_time_s = request.connexion_time_s
            monitor_device_request.sample_rate_ms = request.sample_rate_ms
    
            self.MonitorDeviceKpi(monitor_device_request,context)
    
            return context_pb2.Empty()
    
        # rpc MonitorDeviceKpi(MonitorDeviceKpiRequest) returns(context.Empty) {}
        def MonitorDeviceKpi ( self, request : monitoring_pb2.MonitorDeviceKpiRequest, context) -> context_pb2.Empty:
    
            # Some code device to perform its actions
    
            LOGGER.info('MonitorDeviceKpi')
    
            # Notify device about monitoring (device client to add)
    
            return context_pb2.Empty()
    
        # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
        def IncludeKpi(self, request : monitoring_pb2.IncludeKpiRequest, context) -> context_pb2.Empty:
    
            LOGGER.info('IncludeKpi')
    
            kpi = self.get_Kpi(request.kpi_id)
            time_stamp = request.time_stamp
            kpi_value = request.kpi_value.intVal
    
            # Build the structure to be included as point in the influxDB
            self.influx_db.write_KPI(time_stamp,kpi.kpi_id.kpi_id.uuid,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value)
    
            self.influx_db.read_KPI_points()
    
            return context_pb2.Empty()
    
    
        def GetStreamKpi ( self, request, context):
            # receives monitoring.KpiId returns stream monitoring.Kpi
            LOGGER.info('GetStreamKpi')
            yield monitoring_pb2.Kpi()
    
        @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
        def GetInstantKpi ( self, request, context):
            # receives monitoring.KpiId returns monitoring.Kpi
            LOGGER.info('GetInstantKpi')
            return monitoring_pb2.Kpi()
    
        def get_Kpi(self, kpiId):
            LOGGER.info('getting Kpi by KpyID')
    
            kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid))
    
            kpi = monitoring_pb2.Kpi()
            kpi.kpi_id.kpi_id.uuid = str(kpi_db[0])
            kpi.kpiDescription = kpi_db[1]
            kpi.device_id.device_id.uuid = kpi_db[2]
            kpi.kpi_sample_type = kpi_db[3]
    
            return kpi