Commit 450fa8dc authored by Javi Moreno's avatar Javi Moreno
Browse files

Restructuring monitoring server

parent 255b32a0
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
import logging

# General settings
LOG_LEVEL = logging.WARNING

# gRPC settings
GRPC_SERVICE_PORT = 7070
GRPC_MAX_WORKERS  = 10
GRPC_GRACE_PERIOD = 60

# Prometheus settings
METRICS_PORT = 9192
+1 −1
Original line number Diff line number Diff line
@@ -36,4 +36,4 @@ COPY common/logger.py common
ENV PORT=7070
EXPOSE 7070

ENTRYPOINT ["python", "-m", "monitoring.service.monitoring_server"]
ENTRYPOINT ["python", "-m", "monitoring.service"]
+60 −0
Original line number Diff line number Diff line
from concurrent import futures

import grpc

from monitoring.proto import monitoring_pb2_grpc
from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD

from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from monitoring.proto.monitoring_pb2_grpc import  add_MonitoringServiceServicer_to_server

from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

BIND_ADDRESS = '0.0.0.0'

class MonitoringService:
    def __init__(self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
                 grace_period=GRPC_GRACE_PERIOD):
        self.address = address
        self.port = port
        self.endpoint = None
        self.max_workers = max_workers
        self.grace_period = grace_period
        self.monitoring_servicer = None
        self.health_servicer = None
        self.pool = None
        self.server = None

    def start(self):
        # create gRPC server
        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers)) # ,interceptors=(tracer_interceptor,))

        # add monitoring servicer class to gRPC server
        self.monitoring_servicer = MonitoringServiceServicerImpl()
        add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)

        # add gRPC health checker servicer class to gRPC server
        self.health_servicer = health.HealthServicer(
            experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
        add_HealthServicer_to_server(self.health_servicer, self.server)

        # start server
        endpoint = '{}:{}'.format(self.address, self.port)
        LOGGER.info('Listening on {}'.format(endpoint))
        self.server.add_insecure_port(endpoint)
        self.server.start()
        self.health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member

        LOGGER.debug('Service started')

    def stop(self):
        LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period))
        self.health_servicer.enter_graceful_shutdown()
        self.server.stop(self.grace_period)
        LOGGER.debug('Service stopped')
+109 −0
Original line number Diff line number Diff line
from monitoring.proto import context_pb2
from monitoring.service import sqlite_tools

from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc

from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

from prometheus_client import start_http_server, Summary
from prometheus_client import Counter, Gauge

MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')

class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
    def __init__(self):
        LOGGER.info('Init monitoringService')
        # Init sqlite monitoring db
        self.sql_db = sqlite_tools.SQLite('monitoring.db')

        # Create influx_db client
        # self.influx_db = influx_tools.Influx("host",port,"user","pass","database")

    # CreateKpi (CreateKpiRequest) returns (KpiId) {}
    def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId :
        LOGGER.info('CreateKpi')

        # Here the code to create a sqlite query to crete a KPI and return a KpiID
        kpi_id = monitoring_pb2.KpiId()

        kpi_description = request.kpiDescription
        kpi_device_id = request.device_id.device_id.uuid
        kpi_sample_type = request.kpi_sample_type

        data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type)
        kpi_id.kpi_id.uuid = str(data)

        return kpi_id

    # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
    def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty:

        LOGGER.info('MonitorKpi')

        # Creates the request to send to the device service
        monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest()
        kpi = self.get_Kpi(request.kpi_id)

        monitor_device_request.kpi.kpi_id.kpi_id.uuid  = kpi.kpi_id.kpi_id.uuid
        monitor_device_request.connexion_time_s = request.connexion_time_s
        monitor_device_request.sample_rate_ms = request.sample_rate_ms

        self.MonitorDeviceKpi(monitor_device_request,context)

        return context_pb2.Empty()

    # rpc MonitorDeviceKpi(MonitorDeviceKpiRequest) returns(context.Empty) {}
    def MonitorDeviceKpi ( self, request : monitoring_pb2.MonitorDeviceKpiRequest, context) -> context_pb2.Empty:

        # Some code device to perform its actions

        LOGGER.info('MonitorDeviceKpi')

        # Notify device about monitoring (device client to add)

        return context_pb2.Empty()

    # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
    def IncludeKpi(self, request : monitoring_pb2.IncludeKpiRequest, context) -> context_pb2.Empty:

        LOGGER.info('IncludeKpi')

        kpi = self.get_Kpi(request.kpi_id)
        time_stamp = request.time_stamp
        kpi_value = request.kpi_value.intVal

        # Build the structure to be included as point in the influxDB
        # self.influx_db.write_KPI(time_stamp,kpi.kpi_id.kpi_id.uuid,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value)

        # self.influx_db.read_KPI_points()

        return context_pb2.Empty()


    def GetStreamKpi ( self, request, context):
        # receives monitoring.KpiId returns stream monitoring.Kpi
        LOGGER.info('GetStreamKpi')
        yield monitoring_pb2.Kpi()

    @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    def GetInstantKpi ( self, request, context):
        # receives monitoring.KpiId returns monitoring.Kpi
        LOGGER.info('GetInstantKpi')
        return monitoring_pb2.Kpi()

    def get_Kpi(self, kpiId):
        LOGGER.info('getting Kpi by KpyID')

        kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid))

        kpi = monitoring_pb2.Kpi()
        kpi.kpi_id.kpi_id.uuid = str(kpi_db[0])
        kpi.kpiDescription = kpi_db[1]
        kpi.device_id.device_id.uuid = kpi_db[2]
        kpi.kpi_sample_type = kpi_db[3]

        return kpi
 No newline at end of file
+56 −0
Original line number Diff line number Diff line
import logging, os, signal, sys, threading
import time

from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT

from common.logger import getJSONLogger
from monitoring.service.MonitoringService import MonitoringService

LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

from prometheus_client import start_http_server

terminate = threading.Event()
logger = None

def signal_handler(signal, frame):
    global terminate, logger
    logger.warning('Terminate signal received')
    terminate.set()

def main():
    global terminate, logger

    service_port = os.environ.get('DEVICESERVICE_SERVICE_PORT_GRPC', GRPC_SERVICE_PORT)
    max_workers  = os.environ.get('MAX_WORKERS',                     GRPC_MAX_WORKERS )
    grace_period = os.environ.get('GRACE_PERIOD',                    GRPC_GRACE_PERIOD)
    log_level    = os.environ.get('LOG_LEVEL',                       LOG_LEVEL        )
    metrics_port = os.environ.get('METRICS_PORT',                    METRICS_PORT     )

    logging.basicConfig(level=log_level)
    logger = logging.getLogger(__name__)

    signal.signal(signal.SIGINT,  signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    logger.info('Starting...')

    # Start metrics server
    start_http_server(metrics_port)

    # Starting device service
    grpc_service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period)
    grpc_service.start()

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=0.1): pass

    logger.info('Terminating...')
    grpc_service.stop()

    logger.info('Bye')
    return 0

if __name__ == '__main__':
    sys.exit(main())
 No newline at end of file
Loading