Skip to content
Snippets Groups Projects
EventTools.py 8.04 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import grpc, json, logging, queue, threading
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from typing import Dict
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.method_wrappers.ServiceExceptions import ServiceException
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto import monitoring_pb2
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.tools.grpc.Tools import grpc_message_to_json_string
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.client.ContextClient import ContextClient
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from monitoring.client.MonitoringClient import MonitoringClient
    
    from monitoring.service.MonitoringServiceServicerImpl import LOGGER
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from monitoring.service.NameMapping import NameMapping
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    LOGGER = logging.getLogger(__name__)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    DEVICE_OP_STATUS_UNDEFINED   = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
    DEVICE_OP_STATUS_DISABLED    = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
    DEVICE_OP_STATUS_ENABLED     = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
    DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
    KPISAMPLETYPE_UNKNOWN        = KpiSampleType.KPISAMPLETYPE_UNKNOWN
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._events_queue = queue.Queue()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._context_client_grpc = ContextClient()
            self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
            self._context_client    = self._context_client_grpc
            self._channel           = self._context_client_grpc.channel
            self._monitoring_client = MonitoringClient(host='127.0.0.1')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
            self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._name_mapping = name_mapping
    
    
        def grpc_server_on(self):
            try:
                grpc.channel_ready_future(self._channel).result(timeout=15)
                return True
            except grpc.FutureTimeoutError:
                return False
    
        def _collect(self, events_stream):
    
            try:
                for event in events_stream:
                    self._events_queue.put_nowait(event)
            except grpc.RpcError as e:
                if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                    raise # pragma: no cover
    
        def start(self):
    
            try:
                self._device_thread.start()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except RuntimeError:
    
                LOGGER.exception('Start EventTools exception')
    
    
        def get_event(self, block : bool = True, timeout : float = 0.1):
            return self._events_queue.get(block=block, timeout=timeout)
    
        def stop(self):
            self._device_stream.cancel()
            self._device_thread.join()
    
        def listen_events(self):
            try:
                kpi_id_list = []
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                while True:
                    try:
                        event = self.get_event(block=True, timeout=0.5)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        event_type = event.event.event_type
                        device_uuid = event.device_id.device_uuid.uuid
                        if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            LOGGER.debug('Ignoring REMOVE event: {:s}'.format(grpc_message_to_json_string(event)))
                            self._device_endpoint_monitored.pop(device_uuid, None)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            continue
    
                        if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            LOGGER.debug('Ignoring UNKNOWN event type: {:s}'.format(grpc_message_to_json_string(event)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            continue
    
                        device = self._context_client.GetDevice(event.device_id)
                        self._name_mapping.set_device_name(device_uuid, device.name)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        device_op_status = device.device_operational_status
                        if device_op_status != DEVICE_OP_STATUS_ENABLED:
                            LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            continue
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        enabled_endpoint_names = set()
                        for config_rule in device.device_config.config_rules:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
                            if config_rule.WhichOneof('config_rule') != 'custom': continue
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            str_resource_key = str(config_rule.custom.resource_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            if not str_resource_key.startswith('/interface['): continue
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            json_resource_value = json.loads(config_rule.custom.resource_value)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            if 'name' not in json_resource_value: continue
                            if 'enabled' not in json_resource_value: continue
                            if not json_resource_value['enabled']: continue
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            enabled_endpoint_names.add(json_resource_value['name'])
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict())
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        for endpoint in device.device_endpoints:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                            endpoint_name_or_uuid = endpoint.name
                            if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
                                endpoint_name_or_uuid = endpoint_uuid
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                            endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False)
                            endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names)
    
                            if not endpoint_was_monitored and endpoint_is_enabled:
                                # activate
                                for value in endpoint.kpi_sample_types:
                                    if value == KPISAMPLETYPE_UNKNOWN: continue
    
                                    kpi_descriptor = monitoring_pb2.KpiDescriptor()
                                    kpi_descriptor.kpi_description = device.device_type
                                    kpi_descriptor.kpi_sample_type = value
                                    kpi_descriptor.device_id.CopyFrom(device.device_id)         # pylint: disable=no-member
                                    kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id)   # pylint: disable=no-member
    
                                    kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
                                    kpi_id_list.append(kpi_id)
                                endpoints_monitored[endpoint_uuid] = True
                            else:
                                MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \
                                      ' endpoint_was_monitored={:s} endpoint_is_enabled={:s}'
                                LOGGER.warning(MSG.format(
                                    grpc_message_to_json_string(event), grpc_message_to_json_string(device),
                                    grpc_message_to_json_string(endpoint), str(endpoint_was_monitored),
                                    str(endpoint_is_enabled)
                                ))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    except queue.Empty:
                        break
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except ServiceException:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except Exception:  # pragma: no cover # pylint: disable=broad-except