Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
MonitoringServiceServicerImpl.py 8.54 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import os,grpc, logging
    import socket
    
    
    from prometheus_client import Summary
    from prometheus_client import Counter
    
    from common.Settings import get_setting
    
    from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST
    from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
    
    from monitoring.service import SqliteTools, InfluxTools
    from monitoring.proto import monitoring_pb2
    from monitoring.proto import monitoring_pb2_grpc
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    from common.rpc_method_wrapper.ServiceExceptions import ServiceException
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    from device.client.DeviceClient import DeviceClient
    from device.proto import device_pb2
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    
    LOGGER = logging.getLogger(__name__)
    
    
    MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
    MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
    
    
    INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
    
    Javi Moreno's avatar
    Javi Moreno committed
    INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
    
    INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
    INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
    
    Javi Moreno's avatar
    Javi Moreno committed
    
    
    DEVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST',      default=DEVICE_SERVICE_HOST     )
    DEVICE_SERVICE_PORT = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT)
    
    
    class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
        def __init__(self):
            LOGGER.info('Init monitoringService')
    
            # Init sqlite monitoring db
    
            self.sql_db = SqliteTools.SQLite('monitoring.db')
    
            self.deviceClient = DeviceClient(address=DEVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT)  # instantiate the client
    
    
            # Create influx_db client
    
            self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
    
    
        # CreateKpi (CreateKpiRequest) returns (KpiId) {}
    
        def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
            # CREATEKPI_COUNTER_STARTED.inc()
    
            LOGGER.info('CreateKpi')
    
            try:
                # Here the code to create a sqlite query to crete a KPI and return a KpiID
                kpi_id = monitoring_pb2.KpiId()
    
                kpi_description = request.kpi_description
                kpi_sample_type = request.kpi_sample_type
                kpi_device_id   = request.device_id.device_uuid.uuid
                kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
                kpi_service_id  = request.service_id.service_uuid.uuid
    
                data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    
    Javi Moreno's avatar
    Javi Moreno committed
    
    
                kpi_id.kpi_id.uuid = str(data)
    
                # CREATEKPI_COUNTER_COMPLETED.inc()
                return kpi_id
            except ServiceException as e:
                LOGGER.exception('CreateKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('CreateKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
    
    
        # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
    
        def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
    
    
            LOGGER.info('MonitorKpi')
    
            try:
                # Creates the request to send to the device service
                monitor_device_request = device_pb2.MonitoringSettings()
    
    
    Javi Moreno's avatar
     
    Javi Moreno committed
                kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
    
                monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
    
                monitor_device_request.kpi_id.kpi_id.uuid                               = request.kpi_id.kpi_id.uuid
                monitor_device_request.sampling_duration_s                              = request.sampling_duration_s
                monitor_device_request.sampling_interval_s                              = request.sampling_interval_s
    
    
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                if s.connect_ex((DEVICE_SERVICE_HOST, DEVICE_GRPC_SERVICE_PORT)) == 0:
                    self.deviceClient.MonitorDeviceKpi(monitor_device_request)
                else:
                    LOGGER.warning('Device service is not reachable')
    
    
                return context_pb2.Empty()
            except ServiceException as e:
                LOGGER.exception('MonitorKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('MonitorKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
    
    
    
        # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
    
        def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
    
    
            LOGGER.info('IncludeKpi')
    
    
    Javi Moreno's avatar
     
    Javi Moreno committed
                kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
    
                if kpiDescriptor is None:
                    LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))
                    return context_pb2.Empty()
    
                kpiSampleType   = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
    
                kpiId           = request.kpi_id.kpi_id.uuid
                deviceId        = kpiDescriptor.device_id.device_uuid.uuid
                endpointId      = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
                serviceId       = kpiDescriptor.service_id.service_uuid.uuid
                time_stamp      = request.timestamp
    
                kpi_value       = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
    
                # Build the structure to be included as point in the influxDB
                self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
    
                #self.influx_db.read_KPI_points()
    
            except ServiceException as e:
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:  # pragma: no cover
                LOGGER.exception('IncludeKpi exception')
                # CREATEKPI_COUNTER_FAILED.inc()
    
        def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
    
            # receives monitoring.KpiId returns stream monitoring.Kpi
            LOGGER.info('GetStreamKpi')
            yield monitoring_pb2.Kpi()
    
        @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    
        def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
    
            # receives monitoring.KpiId returns monitoring.Kpi
            LOGGER.info('GetInstantKpi')
            return monitoring_pb2.Kpi()
    
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
        def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor:
    
            LOGGER.info('getting Kpi by KpiID')
    
    Javi Moreno's avatar
     
    Javi Moreno committed
            try:
                kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid))
    
                #LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
                #LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
                if kpi_db is None: return None
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
                kpiDescriptor = monitoring_pb2.KpiDescriptor()
    
                kpiDescriptor.kpi_description                   = kpi_db[1]
                kpiDescriptor.kpi_sample_type                   = kpi_db[2]
                kpiDescriptor.device_id.device_uuid.uuid        = str(kpi_db[3])
                kpiDescriptor.endpoint_id.endpoint_uuid.uuid    = str(kpi_db[4])
                kpiDescriptor.service_id.service_uuid.uuid      = str(kpi_db[5])
    
                return kpiDescriptor
            except ServiceException as e:
                LOGGER.exception('GetKpiDescriptor exception')
                grpc_context.abort(e.code, e.details)
    
            except Exception as e:  # pragma: no cover
                LOGGER.exception('GetKpiDescriptor exception')