Skip to content
EventTools.py 7.18 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, queue, threading
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import ServiceException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto import monitoring_pb2
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.kpi_sample_types_pb2 import KpiSampleType
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.service.NameMapping import NameMapping
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
DEVICE_OP_STATUS_UNDEFINED   = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED    = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED     = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN        = KpiSampleType.KPISAMPLETYPE_UNKNOWN

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._events_queue = queue.Queue()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._context_client_grpc = ContextClient()
        self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
        self._context_client    = self._context_client_grpc
        self._channel           = self._context_client_grpc.channel
        self._monitoring_client = MonitoringClient(host='127.0.0.1')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
        self._name_mapping = name_mapping

    def grpc_server_on(self):
        try:
            grpc.channel_ready_future(self._channel).result(timeout=15)
            return True
        except grpc.FutureTimeoutError:
            return False

    def _collect(self, events_stream):
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                raise # pragma: no cover

    def start(self):
        try:
            self._device_thread.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except RuntimeError:
            LOGGER.exception('Start EventTools exception')

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def stop(self):
        self._device_stream.cancel()
        self._device_thread.join()

    def listen_events(self):
        try:
            kpi_id_list = []
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            while True:
                # LOGGER.info('getting Kpi by KpiID')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                try:
                    event = self.get_event(block=True, timeout=0.5)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    event_type = event.event.event_type
                    device_uuid = event.device_id.device_uuid.uuid
                    if event_type in {EventTypeEnum.EVENTTYPE_REMOVE}:
                        self._device_to_state.pop(device_uuid, None)
                        continue

                    if event_type not in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
                        # Unknown event type
                        continue

                    device = self._context_client.GetDevice(event.device_id)
                    self._name_mapping.set_device_name(device_uuid, device.name)

                    old_operational_status = self._device_to_state.get(device_uuid, DEVICE_OP_STATUS_UNDEFINED)
                    device_was_not_enabled = (old_operational_status in DEVICE_OP_STATUS_NOT_ENABLED)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    new_operational_status = device.device_operational_status
                    device_is_enabled = (new_operational_status == DEVICE_OP_STATUS_ENABLED)
                    self._device_to_state[device_uuid] = new_operational_status
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    activate_monitoring = device_was_not_enabled and device_is_enabled
                    if not activate_monitoring:
                        # device is not ready for monitoring
                        continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    enabled_endpoint_names = set()
                    for config_rule in device.device_config.config_rules:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
                        if config_rule.WhichOneof('config_rule') != 'custom': continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        str_resource_key = str(config_rule.custom.resource_key)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if not str_resource_key.startswith('/interface['): continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        json_resource_value = json.loads(config_rule.custom.resource_value)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if 'name' not in json_resource_value: continue
                        if 'enabled' not in json_resource_value: continue
                        if not json_resource_value['enabled']: continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        enabled_endpoint_names.add(json_resource_value['name'])

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    for endpoint in device.device_endpoints:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                        endpoint_name_or_uuid = endpoint.name
                        if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
                            endpoint_name_or_uuid = endpoint_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        if endpoint_name_or_uuid not in enabled_endpoint_names: continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                        self._name_mapping.set_endpoint_name(endpoint_uuid, endpoint.name)

                        for value in endpoint.kpi_sample_types:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                            if value == KPISAMPLETYPE_UNKNOWN: continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

                            kpi_descriptor = monitoring_pb2.KpiDescriptor()
                            kpi_descriptor.kpi_description = device.device_type
                            kpi_descriptor.kpi_sample_type = value
                            kpi_descriptor.device_id.CopyFrom(device.device_id)         # pylint: disable=no-member
                            kpi_descriptor.endpoint_id.CopyFrom(endpoint.endpoint_id)   # pylint: disable=no-member

                            kpi_id = self._monitoring_client.SetKpi(kpi_descriptor)
                            kpi_id_list.append(kpi_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                except queue.Empty:
                    break
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except Exception:  # pragma: no cover # pylint: disable=broad-except