Skip to content
monitoring_server.py 3.08 KiB
Newer Older
Ricard Vilalta's avatar
Ricard Vilalta committed
#!/usr/bin/python

import os
from concurrent import futures
from src.monitoring import context_pb2
Ricard Vilalta's avatar
Ricard Vilalta committed
import grpc
#import numpy
import time
Ricard Vilalta's avatar
Ricard Vilalta committed

from . import monitoring_pb2
from . import monitoring_pb2_grpc
Ricard Vilalta's avatar
Ricard Vilalta committed

from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

Ricard Vilalta's avatar
Ricard Vilalta committed

from .logger import getJSONLogger
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
Ricard Vilalta's avatar
Ricard Vilalta committed

import threading

from prometheus_client import start_http_server, Summary
from prometheus_client import Counter, Gauge

PORT = 7070

MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')

Ricard Vilalta's avatar
Ricard Vilalta committed

class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
Ricard Vilalta's avatar
Ricard Vilalta committed
    def __init__(self):
        LOGGER.info('Init monitoringService')
Ricard Vilalta's avatar
Ricard Vilalta committed


    def IncludeKpi(self, request, context):
        # receives monitoring.KPI returns context.Empty
        LOGGER.info('IncludeKpi')
        MONITORING_INCLUDEKPI_COUNTER.inc()
        return context_pb2.Empty()
Ricard Vilalta's avatar
Ricard Vilalta committed


    def MonitorKpi ( self, request, context):
        # receives monitoring.KpiDevice returns context.Empty
        LOGGER.info('IncludeKpi')
        return context_pb2.Empty()
Ricard Vilalta's avatar
Ricard Vilalta committed

    def GetStream_kpi ( self, request, context):
        # receives monitoring.KpiId returns stream monitoring.Kpi
        LOGGER.info('IncludeKpi')
Ricard Vilalta's avatar
Ricard Vilalta committed

    @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
Ricard Vilalta's avatar
Ricard Vilalta committed
    def GetInstantKpi ( self, request, context):
        # receives monitoring.KpiId returns monitoring.Kpi
        LOGGER.info('IncludeKpi')
        return monitoring_pb2.Kpi()
Ricard Vilalta's avatar
Ricard Vilalta committed

def start_server(address='[::]', port=PORT, max_workers=10):
Ricard Vilalta's avatar
Ricard Vilalta committed
    # create gRPC server
    serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,))
Ricard Vilalta's avatar
Ricard Vilalta committed

    # add monitoring servicer class to gRPC server
    monitoring_servicer = MonitoringServiceServicerImpl()
    monitoring_pb2_grpc.add_MonitoringServiceServicer_to_server(monitoring_servicer, serverGRPC)
Ricard Vilalta's avatar
Ricard Vilalta committed

    # add gRPC health checker servicer class to gRPC server
    health_servicer = health.HealthServicer(
        experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
    health_pb2_grpc.add_HealthServicer_to_server(health_servicer, serverGRPC)
Ricard Vilalta's avatar
Ricard Vilalta committed

    # start server
    endpoint = '{}:{}'.format(address, port)
    LOGGER.info('Listening on {}'.format(endpoint))
    serverGRPC.add_insecure_port(endpoint)
    serverGRPC.start()
    health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
    return(serverGRPC)

def stop_server(serverGRPC, grace_period=0):
    serverGRPC.stop(grace_period)

if __name__ == '__main__':
    LOGGER.info('initializing monitoringService')
    port = os.environ.get('PORT', str(PORT))
    serverGRPC = start_server(port=port)
Ricard Vilalta's avatar
Ricard Vilalta committed

    # keep alive
    try:
        while True: time.sleep(0.1)
Ricard Vilalta's avatar
Ricard Vilalta committed
    except KeyboardInterrupt:
        stop_server(serverGRPC)