diff --git a/src/monitoring/Dockerfile b/src/monitoring/Dockerfile index e22c31554325b295d88e1d77729d1f61f150a9c8..cbf6bcb6ae99733e855e86440e54ec80454713a5 100644 --- a/src/monitoring/Dockerfile +++ b/src/monitoring/Dockerfile @@ -23,6 +23,7 @@ WORKDIR /var/teraflow RUN mkdir -p /var/teraflow/monitoring RUN mkdir -p /var/teraflow/common RUN mkdir -p /var/teraflow/common/tools +RUN mkdir -p /var/teraflow/common/rpc_method_wrapper RUN mkdir -p /var/teraflow/device RUN mkdir -p /var/teraflow/device/proto RUN mkdir -p /var/teraflow/device/client @@ -39,5 +40,6 @@ COPY device/client/. device/client COPY device/Config.py device COPY common/logger.py common COPY common/tools/. common/tools +COPY common/rpc_method_wrapper/ServiceExceptions.py common/rpc_method_wrapper ENTRYPOINT ["python", "-m", "monitoring.service"] diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index e547eec62d9947f8f333bf2356b2dcdf06b027ee..bb4de08a2a7bc5b3b5ef0452218af7d636a0383d 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -1,6 +1,7 @@ import os +import grpc +from common.rpc_method_wrapper.ServiceExceptions import ServiceException -import device from device.Config import GRPC_SERVICE_PORT from device.client.DeviceClient import DeviceClient from device.proto import device_pb2 @@ -14,12 +15,39 @@ from common.logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') -from prometheus_client import Summary +from prometheus_client import Summary, Histogram from prometheus_client import Counter MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') +# CREATEKPI_COUNTER_STARTED = Counter ('monitoring_createkpi_counter_started', +# 'Monitoring:CreateKpi counter of requests started' ) +# CREATEKPI_COUNTER_COMPLETED = Counter ('monitoring_createkpi counter_completed', +# 'Monitoring:CreateKpi counter of requests completed') +# CREATEKPI_COUNTER_FAILED = Counter ('monitoring_createkpi_counter_failed', +# 'Monitoring:CreateKpi counter of requests failed' ) +# CREATEKPI_HISTOGRAM_DURATION = Histogram('monitoring_createkpi_histogram_duration', +# 'Monitoring:CreateKpi histogram of request duration') +# +# MONITORKPI_COUNTER_STARTED = Counter ('monitoring_monitorkpi_counter_started', +# 'Monitoring:MonitorKpi counter of requests started' ) +# MONITORKPI_COUNTER_COMPLETED = Counter ('monitoring_monitorkpi counter_completed', +# 'Monitoring:MonitorKpi counter of requests completed') +# MONITORKPI_COUNTER_FAILED = Counter ('monitoring_monitorkpi_counter_failed', +# 'Monitoring:MonitorKpi counter of requests failed' ) +# MONITORKPI_HISTOGRAM_DURATION = Histogram('monitoring_monitorkpi_histogram_duration', +# 'Monitoring:MonitorKpi histogram of request duration') +# +# INCLUDEKPI_COUNTER_STARTED = Counter ('monitoring_includekpi_counter_started', +# 'Monitoring:IncludeKpi counter of requests started' ) +# INCLUDEKPI_COUNTER_COMPLETED = Counter ('monitoring_includekpi counter_completed', +# 'Monitoring:IncludeKpi counter of requests completed') +# INCLUDEKPI_COUNTER_FAILED = Counter ('monitoring_includekpi_counter_failed', +# 'Monitoring:IncludeKpi counter of requests failed' ) +# INCLUDEKPI_HISTOGRAM_DURATION = Histogram('monitoring_includekpi_histogram_duration', +# 'Monitoring:IncludeKpi histogram of request duration') + INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") @@ -37,78 +65,102 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} - def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, context) -> monitoring_pb2.KpiId : + def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : + # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('CreateKpi') - - # Here the code to create a sqlite query to crete a KPI and return a KpiID - kpi_id = monitoring_pb2.KpiId() - - kpi_description = request.kpi_description - kpi_sample_type = request.kpi_sample_type - kpi_device_id = request.device_id.device_uuid.uuid - kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = request.service_id.service_uuid.uuid - - data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - kpi_id.kpi_id.uuid = str(data) - - return kpi_id + try: + # Here the code to create a sqlite query to crete a KPI and return a KpiID + kpi_id = monitoring_pb2.KpiId() + + kpi_description = request.kpi_description + kpi_sample_type = request.kpi_sample_type + kpi_device_id = request.device_id.device_uuid.uuid + kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = request.service_id.service_uuid.uuid + + data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + kpi_id.kpi_id.uuid = str(data) + + # CREATEKPI_COUNTER_COMPLETED.inc() + return kpi_id + except ServiceException as e: + LOGGER.exception('CreateKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('CreateKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} - def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, context) -> context_pb2.Empty: + def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: LOGGER.info('MonitorKpi') - - # Creates the request to send to the device service - monitor_device_request = device_pb2.MonitoringSettings() - - kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) - - monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description - monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type - monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid - monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid - monitor_device_request.sampling_duration_s = request.sampling_duration_s - monitor_device_request.sampling_interval_s = request.sampling_interval_s - - deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client - # deviceClient.MonitorDeviceKpi(monitor_device_request) - - return context_pb2.Empty() + try: + # Creates the request to send to the device service + monitor_device_request = device_pb2.MonitoringSettings() + + kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) + + monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid + monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description + monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type + monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid + monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid + monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid + monitor_device_request.sampling_duration_s = request.sampling_duration_s + monitor_device_request.sampling_interval_s = request.sampling_interval_s + + deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client + # deviceClient.MonitorDeviceKpi(monitor_device_request) + + return context_pb2.Empty() + except ServiceException as e: + LOGGER.exception('MonitorKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('MonitorKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} - def IncludeKpi(self, request : monitoring_pb2.Kpi, context) -> context_pb2.Empty: + def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: LOGGER.info('IncludeKpi') - kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) - - kpiSampleType = kpiDescriptor.kpi_sample_type - kpiId = request.kpi_id.kpi_id.uuid - deviceId = kpiDescriptor.device_id.device_uuid.uuid - endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - serviceId = kpiDescriptor.service_id.service_uuid.uuid - time_stamp = request.timestamp - kpi_value = request.kpi_value.intVal + try: + kpiDescriptor = self.get_KpiDescriptor(request.kpi_id) - # Build the structure to be included as point in the influxDB - self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + kpiSampleType = kpiDescriptor.kpi_sample_type + kpiId = request.kpi_id.kpi_id.uuid + deviceId = kpiDescriptor.device_id.device_uuid.uuid + endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid + serviceId = kpiDescriptor.service_id.service_uuid.uuid + time_stamp = request.timestamp + kpi_value = request.kpi_value.intVal - self.influx_db.read_KPI_points() + # Build the structure to be included as point in the influxDB + self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) - return context_pb2.Empty() + self.influx_db.read_KPI_points() + return context_pb2.Empty() + except ServiceException as e: + LOGGER.exception('IncludeKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('IncludeKpi exception') + # CREATEKPI_COUNTER_FAILED.inc() - def GetStreamKpi ( self, request, context): + def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): # receives monitoring.KpiId returns stream monitoring.Kpi LOGGER.info('GetStreamKpi') yield monitoring_pb2.Kpi() @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() - def GetInstantKpi ( self, request, context): + def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): # receives monitoring.KpiId returns monitoring.Kpi LOGGER.info('GetInstantKpi') return monitoring_pb2.Kpi()