Skip to content
Snippets Groups Projects
EventTools.py 8.04 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, queue, threading
from typing import Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import ServiceException
from common.proto import monitoring_pb2
from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from monitoring.service.NameMapping import NameMapping
LOGGER = logging.getLogger(__name__)

DEVICE_OP_STATUS_UNDEFINED   = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED    = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED     = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN        = KpiSampleType.KPISAMPLETYPE_UNKNOWN

    def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
        self._events_queue = queue.Queue()
        self._context_client_grpc = ContextClient()
        self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
        self._context_client    = self._context_client_grpc
        self._channel           = self._context_client_grpc.channel
        self._monitoring_client = MonitoringClient(host='127.0.0.1')
        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
        #self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
        self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
        self._name_mapping = name_mapping

    def grpc_server_on(self):
        try:
            grpc.channel_ready_future(self._channel).result(timeout=15)
            return True
        except grpc.FutureTimeoutError:
            return False

    def _collect(self, events_stream):
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                raise # pragma: no cover

    def start(self):
        try:
            self._device_thread.start()
        except RuntimeError:
            LOGGER.exception('Start EventTools exception')

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def stop(self):
        self._device_stream.cancel()
        self._device_thread.join()

    def listen_events(self):
        try:
            kpi_id_list = []
            while True:
                try:
                    event = self.get_event(block=True, timeout=0.5)

                    event_type = event.event.event_type
                    device_uuid = event.device_id.device_uuid.uuid
                    if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}:
                        LOGGER.debug('Ignoring REMOVE event: {:s}'.format(grpc_message_to_json_string(event)))
                        self._device_endpoint_monitored.pop(device_uuid, None)
                        continue

                    if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
                        LOGGER.debug('Ignoring UNKNOWN event type: {:s}'.format(grpc_message_to_json_string(event)))
                        continue

                    device = self._context_client.GetDevice(event.device_id)
                    self._name_mapping.set_device_name(device_uuid, device.name)

                    device_op_status = device.device_operational_status
                    if device_op_status != DEVICE_OP_STATUS_ENABLED:
                        LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device)))
                        continue
                    enabled_endpoint_names = set()
                    for config_rule in device.device_config.config_rules:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
                        if config_rule.WhichOneof('config_rule') != 'custom': continue
                        str_resource_key = str(config_rule.custom.resource_key)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if not str_resource_key.startswith('/interface['): continue
                        json_resource_value = json.loads(config_rule.custom.resource_value)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if 'name' not in json_resource_value: continue
                        if 'enabled' not in json_resource_value: continue
                        if not json_resource_value['enabled']: continue
                        enabled_endpoint_names.add(json_resource_value['name'])

                    endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict())
                    for endpoint in device.device_endpoints:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                        endpoint_name_or_uuid = endpoint.name
                        if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
                            endpoint_name_or_uuid = endpoint_uuid
                        self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name)

                        endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False)
                        endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names)

                        if not endpoint_was_monitored and endpoint_is_enabled:
                            # activate
                            for value in endpoint.kpi_sample_types:
                                if value == KPISAMPLETYPE_UNKNOWN: continue

                                kpi_descriptor = monitoring_pb2.KpiDescriptor()
                                kpi_descriptor.kpi_description = device.device_type
                                kpi_descriptor.kpi_sample_type = value
                                kpi_descriptor.device_id.CopyFrom(device.device_id)         # pylint: disable=no-member
                                kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id)   # pylint: disable=no-member

                                kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
                                kpi_id_list.append(kpi_id)
                            endpoints_monitored[endpoint_uuid] = True
                        else:
                            MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \
                                  ' endpoint_was_monitored={:s} endpoint_is_enabled={:s}'
                            LOGGER.warning(MSG.format(
                                grpc_message_to_json_string(event), grpc_message_to_json_string(device),
                                grpc_message_to_json_string(endpoint), str(endpoint_was_monitored),
                                str(endpoint_is_enabled)
                            ))
                except queue.Empty:
                    break
        except ServiceException:
        except Exception:  # pragma: no cover # pylint: disable=broad-except