import os import grpc from common.rpc_method_wrapper.ServiceExceptions import ServiceException from device.Config import GRPC_SERVICE_PORT from device.client.DeviceClient import DeviceClient from device.proto import device_pb2 from monitoring.proto import context_pb2 from monitoring.service import sqlite_tools, influx_tools from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2_grpc from common.logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') from prometheus_client import Summary, Histogram from prometheus_client import Counter MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') # CREATEKPI_COUNTER_STARTED = Counter ('monitoring_createkpi_counter_started', # 'Monitoring:CreateKpi counter of requests started' ) # CREATEKPI_COUNTER_COMPLETED = Counter ('monitoring_createkpi counter_completed', # 'Monitoring:CreateKpi counter of requests completed') # CREATEKPI_COUNTER_FAILED = Counter ('monitoring_createkpi_counter_failed', # 'Monitoring:CreateKpi counter of requests failed' ) # CREATEKPI_HISTOGRAM_DURATION = Histogram('monitoring_createkpi_histogram_duration', # 'Monitoring:CreateKpi histogram of request duration') # # MONITORKPI_COUNTER_STARTED = Counter ('monitoring_monitorkpi_counter_started', # 'Monitoring:MonitorKpi counter of requests started' ) # MONITORKPI_COUNTER_COMPLETED = Counter ('monitoring_monitorkpi counter_completed', # 'Monitoring:MonitorKpi counter of requests completed') # MONITORKPI_COUNTER_FAILED = Counter ('monitoring_monitorkpi_counter_failed', # 'Monitoring:MonitorKpi counter of requests failed' ) # MONITORKPI_HISTOGRAM_DURATION = Histogram('monitoring_monitorkpi_histogram_duration', # 'Monitoring:MonitorKpi histogram of request duration') # # INCLUDEKPI_COUNTER_STARTED = Counter ('monitoring_includekpi_counter_started', # 'Monitoring:IncludeKpi counter of requests started' ) # INCLUDEKPI_COUNTER_COMPLETED = Counter ('monitoring_includekpi counter_completed', # 'Monitoring:IncludeKpi counter of requests completed') # INCLUDEKPI_COUNTER_FAILED = Counter ('monitoring_includekpi_counter_failed', # 'Monitoring:IncludeKpi counter of requests failed' ) # INCLUDEKPI_HISTOGRAM_DURATION = Histogram('monitoring_includekpi_histogram_duration', # 'Monitoring:IncludeKpi histogram of request duration') INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = sqlite_tools.SQLite('monitoring.db') # Create influx_db client self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('CreateKpi') try: # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = monitoring_pb2.KpiId() kpi_description = request.kpi_description kpi_sample_type = request.kpi_sample_type kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) kpi_id.kpi_id.uuid = str(data) # CREATEKPI_COUNTER_COMPLETED.inc() return kpi_id except ServiceException as e: LOGGER.exception('CreateKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('CreateKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: LOGGER.info('MonitorKpi') try: # Creates the request to send to the device service monitor_device_request = device_pb2.MonitoringSettings() kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid monitor_device_request.sampling_duration_s = request.sampling_duration_s monitor_device_request.sampling_interval_s = request.sampling_interval_s deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client # deviceClient.MonitorDeviceKpi(monitor_device_request) return context_pb2.Empty() except ServiceException as e: LOGGER.exception('MonitorKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('MonitorKpi exception') # CREATEKPI_COUNTER_FAILED.inc() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: LOGGER.info('IncludeKpi') try: kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) kpiSampleType = kpiDescriptor.kpi_sample_type kpiId = request.kpi_id.kpi_id.uuid deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid time_stamp = request.timestamp kpi_value = request.kpi_value.intVal # Build the structure to be included as point in the influxDB self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) self.influx_db.read_KPI_points() return context_pb2.Empty() except ServiceException as e: LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): # receives monitoring.KpiId returns stream monitoring.Kpi LOGGER.info('GetStreamKpi') yield monitoring_pb2.Kpi() @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): # receives monitoring.KpiId returns monitoring.Kpi LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi() def get_KpiDescriptor(self, kpiId): LOGGER.info('getting Kpi by KpiID') kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid)) kpiDescriptor = monitoring_pb2.KpiDescriptor() kpiDescriptor.kpi_description = kpi_db[1] kpiDescriptor.kpi_sample_type = kpi_db[2] kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) return kpiDescriptor