Skip to content
test_unitary.py 20.7 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, os, pytest
from time import sleep
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.logger import getJSONLogger
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor

from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS

from monitoring.client.MonitoringClient import MonitoringClient
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, MetricsDBTools
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID

from monitoring.service.MonitoringServiceServicerImpl import LOGGER
# LOGGER = getJSONLogger('monitoringservice-server')
# LOGGER.setLevel('DEBUG')
Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Setup
###########################
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOCAL_HOST = '127.0.0.1'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")

Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
    _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
    yield _database, _message_broker
    _message_broker.terminate()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database, message_broker = context_db_mb
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(database, message_broker)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing DeviceService...')
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(driver_instance_cache)
    _service.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding DeviceService...')
    yield _service

    LOGGER.info('Terminating DeviceService...')
    _service.stop()

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
    _client = DeviceClient()
    yield _client
    _client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
        context_service : ContextService,  # pylint: disable=redefined-outer-name
        device_service : DeviceService     # pylint: disable=redefined-outer-name
    ):
    LOGGER.info('Initializing MonitoringService...')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = MonitoringService()

    # yield the server, when test finishes, execution will resume to stop it
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Yielding MonitoringService...')

    LOGGER.info('Terminating MonitoringService...')

# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing MonitoringClient...')
    _client = MonitoringClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding MonitoringClient...')
    yield _client
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Closing MonitoringClient...')
    _client.close()
Javi Moreno's avatar
Javi Moreno committed
@pytest.fixture(scope='session')
def sql_db():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _sql_db = SqliteTools.SQLite('monitoring.db')
    return _sql_db
Javi Moreno's avatar
Javi Moreno committed

@pytest.fixture(scope='session')
def metrics_db():
    _metrics_db = MetricsDBTools.MetricsDB(
        METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
    return _metrics_db

Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_create_kpi requesting')
    response = monitoring_client.SetKpi(create_kpi_request())
    assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's MonitorKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_monitor_kpi(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
    LOGGER.info('test_monitor_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database = context_db_mb[0]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID

    response = monitoring_client.SetKpi(create_kpi_request())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
    response = monitoring_client.MonitorKpi(_monitor_kpi_request)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)


# Test case that makes use of client fixture to test server's IncludeKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    # make call to server
    LOGGER.warning('test_include_kpi requesting')
    kpi_id = monitoring_client.SetKpi(create_kpi_request())
    response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
    assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_getstream_kpi begin')
    response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    #assert isinstance(response, Kpi)

# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_getinstant_kpi begin')
#     response = monitoring_client.GetInstantKpi(kpi_id())
#     LOGGER.debug(str(response))
#     # assert isinstance(response, Kpi)
Javi Moreno's avatar
 
Javi Moreno committed

# Test case that makes use of client fixture to test server's GetInstantKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
 
Javi Moreno committed
    LOGGER.warning('test_getkpidescritor_kpi begin')
    response = monitoring_client.SetKpi(create_kpi_request())
    # LOGGER.debug(str(response))
Javi Moreno's avatar
Javi Moreno committed
    response = monitoring_client.GetKpiDescriptor(response)
    # LOGGER.debug(str(response))
    assert isinstance(response, KpiDescriptor)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
Javi Moreno's avatar
Javi Moreno committed

    response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    assert isinstance(response, int)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member

    _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    response = sql_db.get_KPI(_kpi_id)
Javi Moreno's avatar
Javi Moreno committed
    assert isinstance(response, tuple)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
    response = sql_db.get_KPIS()
    assert isinstance(response, list)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')

    response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if not response:
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
Javi Moreno's avatar
Javi Moreno committed

        sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')

    response = sql_db.delete_kpid_id(1)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if not response:
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_kpid_id(_kpi_id)
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metric_sdb_tools_write_kpi begin')


def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_events_tools(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Javi Moreno's avatar
Javi Moreno committed
    events_collector.start()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno's avatar
Javi Moreno committed
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno's avatar
Javi Moreno committed

    events_collector.stop()


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_get_device_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Check create event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    event = events_collector.get_event(block=True)
    assert isinstance(event, DeviceEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_listen_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):

    LOGGER.warning('test_listen_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno's avatar
Javi Moreno committed
    kpi_id_list = events_collector.listen_events()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(kpi_id_list) > 0
    events_collector.stop()