#!/usr/bin/python import os from concurrent import futures from src.monitoring import context_pb2 import grpc #import numpy import time from . import monitoring_pb2 from . import monitoring_pb2_grpc from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from .logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') import threading from prometheus_client import start_http_server, Summary from prometheus_client import Counter, Gauge PORT = 7070 MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') def IncludeKpi(self, request, context): # receives monitoring.KPI returns context.Empty LOGGER.info('IncludeKpi') MONITORING_INCLUDEKPI_COUNTER.inc() return context_pb2.Empty() def MonitorKpi ( self, request, context): # receives monitoring.KpiDevice returns context.Empty LOGGER.info('IncludeKpi') return context_pb2.Empty() def GetStream_kpi ( self, request, context): # receives monitoring.KpiId returns stream monitoring.Kpi LOGGER.info('IncludeKpi') return @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi ( self, request, context): # receives monitoring.KpiId returns monitoring.Kpi LOGGER.info('IncludeKpi') return monitoring_pb2.Kpi() def start_server(address='[::]', port=PORT, max_workers=10): # create gRPC server serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,)) # add monitoring servicer class to gRPC server monitoring_servicer = MonitoringServiceServicerImpl() monitoring_pb2_grpc.add_MonitoringServiceServicer_to_server(monitoring_servicer, serverGRPC) # add gRPC health checker servicer class to gRPC server health_servicer = health.HealthServicer( experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) health_pb2_grpc.add_HealthServicer_to_server(health_servicer, serverGRPC) # start server endpoint = '{}:{}'.format(address, port) LOGGER.info('Listening on {}'.format(endpoint)) serverGRPC.add_insecure_port(endpoint) serverGRPC.start() health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member return(serverGRPC) def stop_server(serverGRPC, grace_period=0): serverGRPC.stop(grace_period) if __name__ == '__main__': LOGGER.info('initializing monitoringService') port = os.environ.get('PORT', str(PORT)) serverGRPC = start_server(port=port) # keep alive try: while True: time.sleep(0.1) except KeyboardInterrupt: stop_server(serverGRPC)