diff --git a/proto/monitoring.proto b/proto/monitoring.proto index ea7f532bbb3aa6a9e9bcb2223f85619c5ae851f8..5eb30651ee14c9c5f0215008ea517f87f94dd285 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -99,7 +99,7 @@ message KpiValue { message KpiList { - repeated Kpi kpi_list = 1; + repeated Kpi kpi = 1; } message KpiDescriptorList { diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 8cf2928f403b8a15728141787f41905a51c03103..f6c23680b3d096db63781270110a8b70cbfcd9c0 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -28,6 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmLis KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException +from common.tools.timestamp.Converters import timestamp_string_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient @@ -153,23 +154,23 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): try: kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + if kpiDescriptor is None: - LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id))) + raise Exception(LOGGER.exception('IncludeKpi exception: Sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))) + else: + kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') + kpiId = request.kpi_id.kpi_id.uuid + deviceId = kpiDescriptor.device_id.device_uuid.uuid + endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid + serviceId = kpiDescriptor.service_id.service_uuid.uuid + time_stamp = request.timestamp.timestamp + kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) + + # Build the structure to be included as point in the MetricsDB + self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + + #self.influx_db.read_KPI_points() return Empty() - - kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') - kpiId = request.kpi_id.kpi_id.uuid - deviceId = kpiDescriptor.device_id.device_uuid.uuid - endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - serviceId = kpiDescriptor.service_id.service_uuid.uuid - time_stamp = request.timestamp.timestamp - kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - - # Build the structure to be included as point in the MetricsDB - self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) - - #self.influx_db.read_KPI_points() - except ServiceException as e: LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() @@ -177,7 +178,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() - return Empty() + # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty: @@ -235,14 +236,22 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): start_timestamp = request.start_timestamp.timestamp end_timestamp = request.end_timestamp.timestamp - subscriber = "localhost" + subscriber = "localhost" # Investigate how to get info from the requester subs_id = self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) - self.subs_manager.create_subscription(subs_id,kpi_id,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) + self.subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) # parse queue to append kpis into the list + while not subs_queue.empty(): + list = subs_queue.get_nowait() + for item in list: + kpi = kpi.kpi_id.kpi_id.uuid = item[0] + kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) + kpi.kpi_value.floatVal = item[2] # This must be improved + subs_response.kpi_list.kpi.append(kpi) + + subs_response.subs_id.subs_id.uuid = subs_id - # TBC yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception')