Loading src/monitoring/service/monitoring_server.py +33 −21 Original line number Diff line number Diff line Loading @@ -3,10 +3,11 @@ import os from concurrent import futures from ..client import monitoring_client from ..client.monitoring_client import MonitoringClient from ..proto import context_pb2 import grpc from . import sqlite_tools as sqltools from . import influx_tools as influxtools #import numpy import time Loading Loading @@ -38,21 +39,25 @@ MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monito class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = sqltools.SQLite('monitoring.db') # Create influx_db client # self.influx_db = influxtools.Influx() # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId : LOGGER.info('CreateKpi') # Here the code to create a sqlite query to crete a KPI and return a KpiID # Change these lines by the KpiID assigned by the DB # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = monitoring_pb2.KpiId() kpi_id.kpi_id.uuid = 'KPIID0000' # Create KPI object with the request info and the KpiID kpi = monitoring_pb2.Kpi() kpi.device_id.device_id.uuid = request.device_id.device_id.uuid kpi.kpi_sample_type = request.kpi_sample_type kpi.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid kpi_description = request.kpiDescription kpi_device_id = request.device_id.device_id.uuid kpi_sample_type = request.kpi_sample_type data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type) kpi_id.kpi_id.uuid = str(data) return kpi_id Loading @@ -63,7 +68,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # Creates the request to send to the device service monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest() kpi = get_Kpi(request.kpi_id) kpi = self.get_Kpi(request.kpi_id) monitor_device_request.kpi.kpi_id.kpi_id.uuid = kpi.kpi_id.kpi_id.uuid monitor_device_request.connexion_time_s = request.connexion_time_s Loading @@ -81,7 +86,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('MonitorDeviceKpi') # Notify device about monitoring # Notify device about monitoring (device client to add) return context_pb2.Empty() Loading @@ -90,14 +95,12 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('IncludeKpi') kpi = get_Kpi(request.kpi_id) kpi = self.get_Kpi(request.kpi_id) time_stamp = request.time_stamp kpi_value = request.kpi_value # Build the structure to be included as point in the influxDB # Send the Kpi point to the influxDB # self.influx_db.write_KPI(time_stamp,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value) return context_pb2.Empty() Loading @@ -113,6 +116,21 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi() def get_Kpi(self, kpiId): LOGGER.info('getting Kpi by KpyID') kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid)) kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = str(kpi_db[0]) kpi.kpiDescription = kpi_db[1] kpi.device_id.device_id.uuid = kpi_db[2] kpi.kpi_sample_type = kpi_db[3] print(kpi) return kpi def start_server(address='[::]', port=PORT, max_workers=10): # create gRPC server serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,)) Loading @@ -137,12 +155,6 @@ def start_server(address='[::]', port=PORT, max_workers=10): def stop_server(serverGRPC, grace_period=0): serverGRPC.stop(grace_period) def get_Kpi(kpiId): LOGGER.info('getting Kpi by KpyID') # Change these lines with the correct ones after DB query kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = kpiId.kpi_id.uuid return kpi if __name__ == '__main__': LOGGER.info('initializing monitoringService') Loading src/monitoring/service/sqlite_tools.py +1 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,7 @@ import sqlite3 as sl class SQLite(): def __init__(self, database): self.client = sl.connect(database) self.client = sl.connect(database, check_same_thread=False) self.client.execute(""" CREATE TABLE IF NOT EXISTS KPI( kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, Loading src/monitoring/tests/test_monitoring.py +2 −3 Original line number Diff line number Diff line Loading @@ -78,7 +78,7 @@ def monitor_kpi_request(): LOGGER.warning('test_monitor_kpi begin') monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.kpi_id.uuid = 'KPIID0000' monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) monitor_kpi_request.connexion_time_s = 120 monitor_kpi_request.sample_rate_ms = 5 Loading @@ -99,7 +99,7 @@ def include_kpi_request(): LOGGER.warning('test_include_kpi begin') include_kpi_request = monitoring_pb2.IncludeKpiRequest() include_kpi_request.kpi_id.kpi_id.uuid = 'KPIID0000' include_kpi_request.kpi_id.kpi_id.uuid = str(1) include_kpi_request.time_stamp = "2021-10-12T13:14:42Z" include_kpi_request.kpi_value.intVal = 500 Loading Loading @@ -139,7 +139,6 @@ def test_include_kpi(monitoring_client,include_kpi_request): LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_getstream_kpi(monitoring_client,kpi): LOGGER.warning('test_getstream_kpi begin') Loading Loading
src/monitoring/service/monitoring_server.py +33 −21 Original line number Diff line number Diff line Loading @@ -3,10 +3,11 @@ import os from concurrent import futures from ..client import monitoring_client from ..client.monitoring_client import MonitoringClient from ..proto import context_pb2 import grpc from . import sqlite_tools as sqltools from . import influx_tools as influxtools #import numpy import time Loading Loading @@ -38,21 +39,25 @@ MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monito class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = sqltools.SQLite('monitoring.db') # Create influx_db client # self.influx_db = influxtools.Influx() # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.CreateKpiRequest, context) -> monitoring_pb2.KpiId : LOGGER.info('CreateKpi') # Here the code to create a sqlite query to crete a KPI and return a KpiID # Change these lines by the KpiID assigned by the DB # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = monitoring_pb2.KpiId() kpi_id.kpi_id.uuid = 'KPIID0000' # Create KPI object with the request info and the KpiID kpi = monitoring_pb2.Kpi() kpi.device_id.device_id.uuid = request.device_id.device_id.uuid kpi.kpi_sample_type = request.kpi_sample_type kpi.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid kpi_description = request.kpiDescription kpi_device_id = request.device_id.device_id.uuid kpi_sample_type = request.kpi_sample_type data = self.sql_db.insert_KPI(kpi_description, kpi_device_id, kpi_sample_type) kpi_id.kpi_id.uuid = str(data) return kpi_id Loading @@ -63,7 +68,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # Creates the request to send to the device service monitor_device_request = monitoring_pb2.MonitorDeviceKpiRequest() kpi = get_Kpi(request.kpi_id) kpi = self.get_Kpi(request.kpi_id) monitor_device_request.kpi.kpi_id.kpi_id.uuid = kpi.kpi_id.kpi_id.uuid monitor_device_request.connexion_time_s = request.connexion_time_s Loading @@ -81,7 +86,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('MonitorDeviceKpi') # Notify device about monitoring # Notify device about monitoring (device client to add) return context_pb2.Empty() Loading @@ -90,14 +95,12 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('IncludeKpi') kpi = get_Kpi(request.kpi_id) kpi = self.get_Kpi(request.kpi_id) time_stamp = request.time_stamp kpi_value = request.kpi_value # Build the structure to be included as point in the influxDB # Send the Kpi point to the influxDB # self.influx_db.write_KPI(time_stamp,kpi.device_id.device_id.uuid,kpi.kpi_sample_type,kpi_value) return context_pb2.Empty() Loading @@ -113,6 +116,21 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi() def get_Kpi(self, kpiId): LOGGER.info('getting Kpi by KpyID') kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid)) kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = str(kpi_db[0]) kpi.kpiDescription = kpi_db[1] kpi.device_id.device_id.uuid = kpi_db[2] kpi.kpi_sample_type = kpi_db[3] print(kpi) return kpi def start_server(address='[::]', port=PORT, max_workers=10): # create gRPC server serverGRPC = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) # ,interceptors=(tracer_interceptor,)) Loading @@ -137,12 +155,6 @@ def start_server(address='[::]', port=PORT, max_workers=10): def stop_server(serverGRPC, grace_period=0): serverGRPC.stop(grace_period) def get_Kpi(kpiId): LOGGER.info('getting Kpi by KpyID') # Change these lines with the correct ones after DB query kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = kpiId.kpi_id.uuid return kpi if __name__ == '__main__': LOGGER.info('initializing monitoringService') Loading
src/monitoring/service/sqlite_tools.py +1 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,7 @@ import sqlite3 as sl class SQLite(): def __init__(self, database): self.client = sl.connect(database) self.client = sl.connect(database, check_same_thread=False) self.client.execute(""" CREATE TABLE IF NOT EXISTS KPI( kpi_id INTEGER PRIMARY KEY AUTOINCREMENT, Loading
src/monitoring/tests/test_monitoring.py +2 −3 Original line number Diff line number Diff line Loading @@ -78,7 +78,7 @@ def monitor_kpi_request(): LOGGER.warning('test_monitor_kpi begin') monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.kpi_id.uuid = 'KPIID0000' monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) monitor_kpi_request.connexion_time_s = 120 monitor_kpi_request.sample_rate_ms = 5 Loading @@ -99,7 +99,7 @@ def include_kpi_request(): LOGGER.warning('test_include_kpi begin') include_kpi_request = monitoring_pb2.IncludeKpiRequest() include_kpi_request.kpi_id.kpi_id.uuid = 'KPIID0000' include_kpi_request.kpi_id.kpi_id.uuid = str(1) include_kpi_request.time_stamp = "2021-10-12T13:14:42Z" include_kpi_request.kpi_value.intVal = 500 Loading Loading @@ -139,7 +139,6 @@ def test_include_kpi(monitoring_client,include_kpi_request): LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_getstream_kpi(monitoring_client,kpi): LOGGER.warning('test_getstream_kpi begin') Loading