Skip to content
Snippets Groups Projects
MonitoringServiceServicerImpl.py 5.55 KiB
Newer Older
Javi Moreno's avatar
Javi Moreno committed
import os

import device
from device.Config import GRPC_SERVICE_PORT
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
from monitoring.proto import context_pb2
Javi Moreno's avatar
Javi Moreno committed
from monitoring.service import sqlite_tools, influx_tools

from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc

from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

Javi Moreno's avatar
Javi Moreno committed
from prometheus_client import Summary
from prometheus_client import Counter

MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')

INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
Javi Moreno's avatar
Javi Moreno committed
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
    def __init__(self):
        LOGGER.info('Init monitoringService')
        # Init sqlite monitoring db
Javi Moreno's avatar
Javi Moreno committed
        self.sql_db = sqlite_tools.SQLite('monitoring.db')

        # Create influx_db client
        self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)

    # CreateKpi (CreateKpiRequest) returns (KpiId) {}
    def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, context) -> monitoring_pb2.KpiId :
        LOGGER.info('CreateKpi')

        # Here the code to create a sqlite query to crete a KPI and return a KpiID
        kpi_id = monitoring_pb2.KpiId()

        kpi_description = request.kpi_description
        kpi_sample_type = request.kpi_sample_type
        kpi_device_id   = request.device_id.device_uuid.uuid
        kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
        kpi_service_id  = request.service_id.service_uuid.uuid
        data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        kpi_id.kpi_id.uuid = str(data)

        return kpi_id

    # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
    def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty:

        LOGGER.info('MonitorKpi')

        # Creates the request to send to the device service
        monitor_device_request = device_pb2.MonitoringSettings()
Javi Moreno's avatar
Javi Moreno committed

        kpiDescriptor = self.get_KpiDescriptor(request.kpi_id)
        monitor_device_request.kpi_id.kpi_id.uuid                               = request.kpi_id.kpi_id.uuid
        monitor_device_request.kpi_descriptor.kpi_description                   = kpiDescriptor.kpi_description
        monitor_device_request.kpi_descriptor.kpi_sample_type                   = kpiDescriptor.kpi_sample_type
        monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid        = kpiDescriptor.device_id.device_uuid.uuid
        monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid    = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
        monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid      = kpiDescriptor.service_id.service_uuid.uuid
        monitor_device_request.sampling_duration_s                              = request.sampling_duration_s
        monitor_device_request.sampling_interval_s                              = request.sampling_interval_s
        deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT )  # instantiate the client
Javi Moreno's avatar
Javi Moreno committed
        # deviceClient.MonitorDeviceKpi(monitor_device_request)

        return context_pb2.Empty()


    # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
    def IncludeKpi(self, request : monitoring_pb2.Kpi, context) -> context_pb2.Empty:

        LOGGER.info('IncludeKpi')

        kpiDescriptor = self.get_KpiDescriptor(request.kpi_id)

        kpiSampleType   = kpiDescriptor.kpi_sample_type
        kpiId           = request.kpi_id.kpi_id.uuid
        deviceId        = kpiDescriptor.device_id.device_uuid.uuid
        endpointId      = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
        serviceId       = kpiDescriptor.service_id.service_uuid.uuid
        time_stamp      = request.timestamp
        kpi_value       = request.kpi_value.intVal

        # Build the structure to be included as point in the influxDB
        self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
Javi Moreno's avatar
Javi Moreno committed
        self.influx_db.read_KPI_points()

        return context_pb2.Empty()


    def GetStreamKpi ( self, request, context):
        # receives monitoring.KpiId returns stream monitoring.Kpi
        LOGGER.info('GetStreamKpi')
        yield monitoring_pb2.Kpi()

    @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    def GetInstantKpi ( self, request, context):
        # receives monitoring.KpiId returns monitoring.Kpi
        LOGGER.info('GetInstantKpi')
        return monitoring_pb2.Kpi()

    def get_KpiDescriptor(self, kpiId):
        LOGGER.info('getting Kpi by KpiID')

        kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid))

        kpiDescriptor = monitoring_pb2.KpiDescriptor()

        kpiDescriptor.kpi_description                   = kpi_db[1]
        kpiDescriptor.kpi_sample_type                   = kpi_db[2]
        kpiDescriptor.device_id.device_uuid.uuid        = str(kpi_db[3])
        kpiDescriptor.endpoint_id.endpoint_uuid.uuid    = str(kpi_db[4])
        kpiDescriptor.service_id.service_uuid.uuid      = str(kpi_db[5])