Skip to content
Snippets Groups Projects
monitoring_server.py 5.43 KiB
Newer Older
  • Learn to ignore specific revisions
  • Ricard Vilalta's avatar
    Ricard Vilalta committed
    #!/usr/bin/python
    
    import os
    from concurrent import futures
    
    
    from src.monitoring.client import monitoring_client
    from src.monitoring.client.monitoring_client import MonitoringClient
    
    from src.monitoring.proto import context_pb2
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    import grpc
    
    #import numpy
    
    import time
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    from src.monitoring.proto import monitoring_pb2
    from src.monitoring.proto import monitoring_pb2_grpc
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    from grpc_health.v1 import health
    from grpc_health.v1 import health_pb2
    from grpc_health.v1 import health_pb2_grpc
    
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    from src.monitoring.logger import getJSONLogger
    
    LOGGER = getJSONLogger('monitoringservice-server')
    LOGGER.setLevel('DEBUG')
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    import threading
    
    
    from prometheus_client import start_http_server, Summary
    from prometheus_client import Counter, Gauge
    
    
    SERVER_ADDRESS = SERVER_ADDRESS = '127.0.0.1'
    
    PORT = 7070
    
    
    MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
    MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
    
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
        def __init__(self):
    
            LOGGER.info('Init monitoringService')
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
        # CreateKpi (CreateKpiRequest) returns (KpiId) {}
        def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId :
            LOGGER.info('CreateKpi')
            # Here the code to create a sqlite query to crete a KPI and return a KpiID
    
            # Change these lines by the KpiID assigned by the DB
            kpi_id = monitoring_pb2.KpiId()
            kpi_id.kpi_id.uuid = 'KPIID0000'
    
            # Create KPI object with the request info and the KpiID
            kpi = monitoring_pb2.Kpi()
            kpi.device_id.device_id.uuid = request.device_id.device_id.uuid
            kpi.kpi_sample_type = request.kpi_sample_type
            kpi.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid
    
            return kpi_id
    
        # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
        def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty:
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    Javi Moreno's avatar
    Javi Moreno committed
            LOGGER.info('MonitorKpi')
    
    
            # Creates the request to send to the device service
            monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest()
            kpi = get_Kpi(request.kpi_id)
    
            monitor_device_request.kpi.kpi_id.kpi_id.uuid  = kpi.kpi_id.kpi_id.uuid
            monitor_device_request.connexion_time_s = request.connexion_time_s
            monitor_device_request.sample_rate_ms = request.sample_rate_ms
    
            client = MonitoringClient(server=SERVER_ADDRESS, port=PORT)
            client.MonitorDeviceKpi(monitor_device_request)
    
            return context_pb2.Empty()
    
        # rpc MonitorDeviceKpi(MonitorDeviceKpiRequest) returns(context.Empty) {}
        def MonitorDeviceKpi ( self, request : monitoring_pb2.MonitorDeviceKpiRequest, context) -> context_pb2.Empty:
    
            # Some code device to perform its actions
    
            LOGGER.info('MonitorDeviceKpi')
    
    Javi Moreno's avatar
    Javi Moreno committed
    
            # Notify device about monitoring
    
    
            return context_pb2.Empty()
    
        # rpc IncludeKpi(IncludeKpiRequest)  returns(context.Empty)    {}
        def IncludeKpi(self, request : monitoring_pb2.IncludeKpiRequest, context) -> context_pb2.Empty:
    
            LOGGER.info('IncludeKpi')
    
    Javi Moreno's avatar
    Javi Moreno committed
    
            kpi = get_Kpi(request.kpi_id)
            time_stamp = request.time_stamp
            kpi_value = request.kpi_value
    
            # Build the structure to be included as point in the influxDB
    
    
            # Send the Kpi point to the influxDB
    
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
        def GetStreamKpi ( self, request, context):
    
            # receives monitoring.KpiId returns stream monitoring.Kpi
    
            LOGGER.info('GetStreamKpi')
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
        @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
        def GetInstantKpi ( self, request, context):
    
            # receives monitoring.KpiId returns monitoring.Kpi
    
    Javi Moreno's avatar
    Javi Moreno committed
            LOGGER.info('GetInstantKpi')
    
            return monitoring_pb2.Kpi()
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
    def start_server(address='[::]', port=PORT, max_workers=10):
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
        # create gRPC server
    
        serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,))
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
        # add monitoring servicer class to gRPC server
        monitoring_servicer = MonitoringServiceServicerImpl()
        monitoring_pb2_grpc.add_MonitoringServiceServicer_to_server(monitoring_servicer, serverGRPC)
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
    
        # add gRPC health checker servicer class to gRPC server
        health_servicer = health.HealthServicer(
            experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
    
        health_pb2_grpc.add_HealthServicer_to_server(health_servicer, serverGRPC)
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
        # start server
    
        endpoint = '{}:{}'.format(address, port)
        LOGGER.info('Listening on {}'.format(endpoint))
        serverGRPC.add_insecure_port(endpoint)
    
        serverGRPC.start()
    
        health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
        return(serverGRPC)
    
    def stop_server(serverGRPC, grace_period=0):
        serverGRPC.stop(grace_period)
    
    
    def get_Kpi(kpiId):
        LOGGER.info('getting Kpi by KpyID')
        # Change these lines with the correct ones after DB query
        kpi = monitoring_pb2.Kpi()
        kpi.kpi_id.kpi_id.uuid = kpiId.kpi_id.uuid
        return kpi
    
    
    if __name__ == '__main__':
        LOGGER.info('initializing monitoringService')
        port = os.environ.get('PORT', str(PORT))
        serverGRPC = start_server(port=port)
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
    
        # keep alive
        try:
    
            while True: time.sleep(0.1)
    
    Ricard Vilalta's avatar
    Ricard Vilalta committed
        except KeyboardInterrupt:
    
            stop_server(serverGRPC)