Commit b6fa5fea authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Monitoring component:

- Fixed EventTools to inherit from context EventsCollector
- Fixed shutdown of events collector
- Added unitary test
parent 2f5fa12f
Loading
Loading
Loading
Loading
+90 −107
Original line number Diff line number Diff line
@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, queue, threading
import grpc, json, logging
from typing import Dict
from common.method_wrappers.ServiceExceptions import ServiceException
from common.proto import monitoring_pb2
from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, Empty, EventTypeEnum
from common.proto.context_pb2 import ConfigActionEnum, DeviceOperationalStatusEnum, EventTypeEnum
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from monitoring.service.NameMapping import NameMapping
@@ -32,18 +33,23 @@ DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTAT
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN        = KpiSampleType.KPISAMPLETYPE_UNKNOWN

class EventsDeviceCollector:
class EventsDeviceCollector(EventsCollector):
    def __init__(self, name_mapping : NameMapping) -> None: # pylint: disable=redefined-outer-name
        self._events_queue = queue.Queue()

        self._context_client_grpc = ContextClient()
        self._device_stream     = self._context_client_grpc.GetDeviceEvents(Empty())
        self._context_client = self._context_client_grpc
        self._channel = self._context_client_grpc.channel
        super().__init__(
            self._context_client,
            activate_context_collector=False,
            activate_topology_collector=False,
            activate_device_collector=True,
            activate_link_collector=False,
            activate_service_collector=False,
            activate_slice_collector=False,
            activate_connection_collector=False,
        )
        self._monitoring_client = MonitoringClient(host='127.0.0.1')

        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False)

        #self._device_to_state : Dict[str, DeviceOperationalStatusEnum] = dict()
        self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
        self._name_mapping = name_mapping
@@ -55,34 +61,14 @@ class EventsDeviceCollector:
        except grpc.FutureTimeoutError:
            return False

    def _collect(self, events_stream):
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                raise # pragma: no cover

    def start(self):
        try:
            self._device_thread.start()
        except RuntimeError:
            LOGGER.exception('Start EventTools exception')

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def stop(self):
        self._device_stream.cancel()
        self._device_thread.join()

    def listen_events(self):
        try:
            kpi_id_list = []

            while True:
                try:
                event = self.get_event(block=True, timeout=0.5)
                if event is None:
                    break

                event_type = event.event.event_type
                device_uuid = event.device_id.device_uuid.uuid
@@ -156,9 +142,6 @@ class EventsDeviceCollector:
                            str(endpoint_is_enabled)
                        ))

                except queue.Empty:
                    break

            return kpi_id_list
        except ServiceException:
            LOGGER.exception('ListenEvents exception')
+1 −1
Original line number Diff line number Diff line
@@ -57,7 +57,7 @@ def start_monitoring(name_mapping : NameMapping):
        # Terminate is set, looping terminates
        LOGGER.warning("Stopping execution...")

    events_collector.start()
    events_collector.stop()

def main():
    global LOGGER # pylint: disable=global-statement
+92 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc
import common.tools.grpc.BaseEventCollector as base_event_collector_module
from common.proto.context_pb2 import DeviceEvent, EventTypeEnum
from monitoring.service import EventTools as event_tools_module
from monitoring.service.NameMapping import NameMapping


class _FakeRpcError(grpc.RpcError):
    def __init__(self, status_code):
        self._status_code = status_code

    def code(self):
        return self._status_code


class _FakeStream:
    def __init__(self, events=None, error=None):
        self._events = iter(events or [])
        self._error = error

    def __iter__(self):
        return self

    def __next__(self):
        if self._error is not None:
            error = self._error
            self._error = None
            raise error
        return next(self._events)

    def cancel(self):
        pass


class _FakeContextClient:
    def __init__(self):
        self.channel = object()
        self.calls = 0

    def GetDeviceEvents(self, _request):
        self.calls += 1
        if self.calls == 1:
            return _FakeStream(error=_FakeRpcError(grpc.StatusCode.UNAVAILABLE))
        if self.calls == 2:
            return _FakeStream(events=[_create_device_event()])
        return _FakeStream(error=_FakeRpcError(grpc.StatusCode.CANCELLED))


class _FakeMonitoringClient:
    def __init__(self, host='127.0.0.1'):
        self.host = host


def _create_device_event():
    event = DeviceEvent()
    event.event.event_type = EventTypeEnum.EVENTTYPE_CREATE
    event.event.timestamp.timestamp = 1.0
    event.device_id.device_uuid.uuid = 'dev1'
    return event


def test_events_device_collector_reconnects_on_unavailable(monkeypatch):
    fake_context_client = _FakeContextClient()

    monkeypatch.setattr(event_tools_module, 'ContextClient', lambda: fake_context_client)
    monkeypatch.setattr(event_tools_module, 'MonitoringClient', _FakeMonitoringClient)
    monkeypatch.setattr(base_event_collector_module.time, 'sleep', lambda _seconds: None)

    collector = event_tools_module.EventsDeviceCollector(NameMapping())
    collector.start()
    try:
        event = collector.get_event(block=True, timeout=1.0)
    finally:
        collector.stop()

    assert fake_context_client.calls >= 2
    assert event.device_id.device_uuid.uuid == 'dev1'