Loading src/monitoring/service/EventTools.py +35 −25 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ from common.method_wrappers.ServiceExceptions import ServiceException from common.proto import monitoring_pb2 from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER Loading @@ -43,7 +44,8 @@ class EventsDeviceCollector: self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False) self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict() #self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict() self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict() self._name_mapping = name_mapping def grpc_server_on(self): Loading Loading @@ -79,33 +81,26 @@ class EventsDeviceCollector: kpi_id_list = [] while True: # LOGGER.info('getting Kpi by KpiID') try: event = self.get_event(block=True, timeout=0.5) event_type = event.event.event_type device_uuid = event.device_id.device_uuid.uuid if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}: self._device_to_state.pop(device_uuid, None) LOGGER.debug('Ignoring REMOVE event: {:s}'.format(grpc_message_to_json_string(event))) self._device_endpoint_monitored.pop(device_uuid, None) continue if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: # Unknown event type LOGGER.debug('Ignoring UNKNOWN event type: {:s}'.format(grpc_message_to_json_string(event))) continue device = self._context_client.GetDevice(event.device_id) self._name_mapping.set_device_name(device_uuid, device.name) old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED) device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED) new_operational_status = device.device_operational_status device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED) self._device_to_state[device_uuid] = new_operational_status activate_monitoring = device_was_not_enabled and device_is_enabled if not activate_monitoring: # device is not ready for monitoring device_op_status = device.device_operational_status if device_op_status != DEVICE_OP_STATUS_ENABLED: LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device))) continue enabled_endpoint_names = set() Loading @@ -120,15 +115,20 @@ class EventsDeviceCollector: if not json_resource_value['enabled']: continue enabled_endpoint_names.add(json_resource_value['name']) endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict()) for endpoint in device.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_name_or_uuid = endpoint.name if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0: endpoint_name_or_uuid = endpoint_uuid if endpoint_name_or_uuid not in enabled_endpoint_names: continue self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name) endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False) endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names) if not endpoint_was_monitored and endpoint_is_enabled: # activate for value in endpoint.kpi_sample_types: if value == KPISAMPLETYPE_UNKNOWN: continue Loading @@ -140,6 +140,16 @@ class EventsDeviceCollector: kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) kpi_id_list.append(kpi_id) endpoints_monitored[endpoint_uuid] = True else: MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \ ' endpoint_was_monitored={:s} endpoint_is_enabled={:s}' LOGGER.warning(MSG.format( grpc_message_to_json_string(event), grpc_message_to_json_string(device), grpc_message_to_json_string(endpoint), str(endpoint_was_monitored), str(endpoint_is_enabled) )) except queue.Empty: break Loading Loading
src/monitoring/service/EventTools.py +35 −25 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ from common.method_wrappers.ServiceExceptions import ServiceException from common.proto import monitoring_pb2 from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service.MonitoringServiceServicerImpl import LOGGER Loading @@ -43,7 +44,8 @@ class EventsDeviceCollector: self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False) self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict() #self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict() self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict() self._name_mapping = name_mapping def grpc_server_on(self): Loading Loading @@ -79,33 +81,26 @@ class EventsDeviceCollector: kpi_id_list = [] while True: # LOGGER.info('getting Kpi by KpiID') try: event = self.get_event(block=True, timeout=0.5) event_type = event.event.event_type device_uuid = event.device_id.device_uuid.uuid if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}: self._device_to_state.pop(device_uuid, None) LOGGER.debug('Ignoring REMOVE event: {:s}'.format(grpc_message_to_json_string(event))) self._device_endpoint_monitored.pop(device_uuid, None) continue if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: # Unknown event type LOGGER.debug('Ignoring UNKNOWN event type: {:s}'.format(grpc_message_to_json_string(event))) continue device = self._context_client.GetDevice(event.device_id) self._name_mapping.set_device_name(device_uuid, device.name) old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED) device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED) new_operational_status = device.device_operational_status device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED) self._device_to_state[device_uuid] = new_operational_status activate_monitoring = device_was_not_enabled and device_is_enabled if not activate_monitoring: # device is not ready for monitoring device_op_status = device.device_operational_status if device_op_status != DEVICE_OP_STATUS_ENABLED: LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device))) continue enabled_endpoint_names = set() Loading @@ -120,15 +115,20 @@ class EventsDeviceCollector: if not json_resource_value['enabled']: continue enabled_endpoint_names.add(json_resource_value['name']) endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict()) for endpoint in device.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_name_or_uuid = endpoint.name if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0: endpoint_name_or_uuid = endpoint_uuid if endpoint_name_or_uuid not in enabled_endpoint_names: continue self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name) endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False) endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names) if not endpoint_was_monitored and endpoint_is_enabled: # activate for value in endpoint.kpi_sample_types: if value == KPISAMPLETYPE_UNKNOWN: continue Loading @@ -140,6 +140,16 @@ class EventsDeviceCollector: kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) kpi_id_list.append(kpi_id) endpoints_monitored[endpoint_uuid] = True else: MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \ ' endpoint_was_monitored={:s} endpoint_is_enabled={:s}' LOGGER.warning(MSG.format( grpc_message_to_json_string(event), grpc_message_to_json_string(device), grpc_message_to_json_string(endpoint), str(endpoint_was_monitored), str(endpoint_is_enabled) )) except queue.Empty: break Loading