Loading src/monitoring/service/monitoring_server.pydeleted 100644 → 0 +0 −162 Original line number Diff line number Diff line #!/usr/bin/python import os from concurrent import futures from monitoring.proto import context_pb2 import grpc from monitoring.service import sqlite_tools # from monitoring.service import influx_tools import time from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2_grpc from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from common.logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') from prometheus_client import start_http_server, Summary from prometheus_client import Counter, Gauge SERVER_ADDRESS = SERVER_ADDRESS = '127.0.0.1' PORT = 7070 MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = sqlite_tools.SQLite('monitoring.db') # Create influx_db client # self.influx_db = influx_tools.Influx("host",port,"user","pass","database") # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId : LOGGER.info('CreateKpi') # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = monitoring_pb2.KpiId() kpi_description = request.kpiDescription kpi_device_id = request.device_id.device_id.uuid kpi_sample_type = request.kpi_sample_type data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type) kpi_id.kpi_id.uuid = str(data) return kpi_id # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty: LOGGER.info('MonitorKpi') # Creates the request to send to the device service monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest() kpi = self.get_Kpi(request.kpi_id) monitor_device_request.kpi.kpi_id.kpi_id.uuid = kpi.kpi_id.kpi_id.uuid monitor_device_request.connexion_time_s = request.connexion_time_s monitor_device_request.sample_rate_ms = request.sample_rate_ms self.MonitorDeviceKpi(monitor_device_request,context) return context_pb2.Empty() # rpc MonitorDeviceKpi(MonitorDeviceKpiRequest) returns(context.Empty) {} def MonitorDeviceKpi ( self, request : monitoring_pb2.MonitorDeviceKpiRequest, context) -> context_pb2.Empty: # Some code device to perform its actions LOGGER.info('MonitorDeviceKpi') # Notify device about monitoring (device client to add) return context_pb2.Empty() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} def IncludeKpi(self, request : monitoring_pb2.IncludeKpiRequest, context) -> context_pb2.Empty: LOGGER.info('IncludeKpi') kpi = self.get_Kpi(request.kpi_id) time_stamp = request.time_stamp kpi_value = request.kpi_value.intVal # Build the structure to be included as point in the influxDB # self.influx_db.write_KPI(time_stamp,kpi.kpi_id.kpi_id.uuid,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value) # self.influx_db.read_KPI_points() return context_pb2.Empty() def GetStreamKpi ( self, request, context): # receives monitoring.KpiId returns stream monitoring.Kpi LOGGER.info('GetStreamKpi') yield monitoring_pb2.Kpi() @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi ( self, request, context): # receives monitoring.KpiId returns monitoring.Kpi LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi() def get_Kpi(self, kpiId): LOGGER.info('getting Kpi by KpyID') kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid)) kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = str(kpi_db[0]) kpi.kpiDescription = kpi_db[1] kpi.device_id.device_id.uuid = kpi_db[2] kpi.kpi_sample_type = kpi_db[3] return kpi def start_server(address='[::]', port=PORT, max_workers=10): # create gRPC server serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,)) # add monitoring servicer class to gRPC server monitoring_servicer = MonitoringServiceServicerImpl() monitoring_pb2_grpc.add_MonitoringServiceServicer_to_server(monitoring_servicer, serverGRPC) # add gRPC health checker servicer class to gRPC server health_servicer = health.HealthServicer( experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) health_pb2_grpc.add_HealthServicer_to_server(health_servicer, serverGRPC) # start server endpoint = '{}:{}'.format(address, port) LOGGER.info('Listening on {}'.format(endpoint)) serverGRPC.add_insecure_port(endpoint) serverGRPC.start() health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member return(serverGRPC) def stop_server(serverGRPC, grace_period=0): serverGRPC.stop(grace_period) if __name__ == '__main__': LOGGER.info('initializing monitoringService') port = os.environ.get('PORT', str(PORT)) serverGRPC = start_server(port=port) # keep alive try: while True: time.sleep(0.1) except KeyboardInterrupt: stop_server(serverGRPC) Loading
src/monitoring/service/monitoring_server.pydeleted 100644 → 0 +0 −162 Original line number Diff line number Diff line #!/usr/bin/python import os from concurrent import futures from monitoring.proto import context_pb2 import grpc from monitoring.service import sqlite_tools # from monitoring.service import influx_tools import time from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2_grpc from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from common.logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') from prometheus_client import start_http_server, Summary from prometheus_client import Counter, Gauge SERVER_ADDRESS = SERVER_ADDRESS = '127.0.0.1' PORT = 7070 MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = sqlite_tools.SQLite('monitoring.db') # Create influx_db client # self.influx_db = influx_tools.Influx("host",port,"user","pass","database") # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId : LOGGER.info('CreateKpi') # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = monitoring_pb2.KpiId() kpi_description = request.kpiDescription kpi_device_id = request.device_id.device_id.uuid kpi_sample_type = request.kpi_sample_type data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type) kpi_id.kpi_id.uuid = str(data) return kpi_id # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty: LOGGER.info('MonitorKpi') # Creates the request to send to the device service monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest() kpi = self.get_Kpi(request.kpi_id) monitor_device_request.kpi.kpi_id.kpi_id.uuid = kpi.kpi_id.kpi_id.uuid monitor_device_request.connexion_time_s = request.connexion_time_s monitor_device_request.sample_rate_ms = request.sample_rate_ms self.MonitorDeviceKpi(monitor_device_request,context) return context_pb2.Empty() # rpc MonitorDeviceKpi(MonitorDeviceKpiRequest) returns(context.Empty) {} def MonitorDeviceKpi ( self, request : monitoring_pb2.MonitorDeviceKpiRequest, context) -> context_pb2.Empty: # Some code device to perform its actions LOGGER.info('MonitorDeviceKpi') # Notify device about monitoring (device client to add) return context_pb2.Empty() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} def IncludeKpi(self, request : monitoring_pb2.IncludeKpiRequest, context) -> context_pb2.Empty: LOGGER.info('IncludeKpi') kpi = self.get_Kpi(request.kpi_id) time_stamp = request.time_stamp kpi_value = request.kpi_value.intVal # Build the structure to be included as point in the influxDB # self.influx_db.write_KPI(time_stamp,kpi.kpi_id.kpi_id.uuid,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value) # self.influx_db.read_KPI_points() return context_pb2.Empty() def GetStreamKpi ( self, request, context): # receives monitoring.KpiId returns stream monitoring.Kpi LOGGER.info('GetStreamKpi') yield monitoring_pb2.Kpi() @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi ( self, request, context): # receives monitoring.KpiId returns monitoring.Kpi LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi() def get_Kpi(self, kpiId): LOGGER.info('getting Kpi by KpyID') kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid)) kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = str(kpi_db[0]) kpi.kpiDescription = kpi_db[1] kpi.device_id.device_id.uuid = kpi_db[2] kpi.kpi_sample_type = kpi_db[3] return kpi def start_server(address='[::]', port=PORT, max_workers=10): # create gRPC server serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,)) # add monitoring servicer class to gRPC server monitoring_servicer = MonitoringServiceServicerImpl() monitoring_pb2_grpc.add_MonitoringServiceServicer_to_server(monitoring_servicer, serverGRPC) # add gRPC health checker servicer class to gRPC server health_servicer = health.HealthServicer( experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) health_pb2_grpc.add_HealthServicer_to_server(health_servicer, serverGRPC) # start server endpoint = '{}:{}'.format(address, port) LOGGER.info('Listening on {}'.format(endpoint)) serverGRPC.add_insecure_port(endpoint) serverGRPC.start() health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member return(serverGRPC) def stop_server(serverGRPC, grace_period=0): serverGRPC.stop(grace_period) if __name__ == '__main__': LOGGER.info('initializing monitoringService') port = os.environ.get('PORT', str(PORT)) serverGRPC = start_server(port=port) # keep alive try: while True: time.sleep(0.1) except KeyboardInterrupt: stop_server(serverGRPC)