# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy, os, pytest import threading import time from time import sleep from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from common.logger import getJSONLogger from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service import ManagementDBTools, MetricsDBTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \ create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \ alarm_subscription from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID from monitoring.service.MonitoringServiceServicerImpl import LOGGER ########################### # Tests Setup ########################### LOCAL_HOST = '127.0.0.1' CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT) DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT) MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") @pytest.fixture(scope='session') def context_db_mb() -> Tuple[Database, MessageBroker]: _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name database, message_broker = context_db_mb database.clear_all() _service = ContextService(database, message_broker) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient() yield _client _client.close() @pytest.fixture(scope='session') def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name LOGGER.info('Initializing DeviceService...') driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = DeviceService(driver_instance_cache) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding DeviceService...') yield _service LOGGER.info('Terminating DeviceService...') _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient() yield _client _client.close() # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def monitoring_service( context_service : ContextService, # pylint: disable=redefined-outer-name device_service : DeviceService # pylint: disable=redefined-outer-name ): LOGGER.info('Initializing MonitoringService...') _service = MonitoringService() _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding MonitoringService...') yield _service LOGGER.info('Terminating MonitoringService...') _service.stop() # This fixture will be requested by test cases and last during testing session. # The client requires the server, so client fixture has the server as dependency. @pytest.fixture(scope='session') def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name LOGGER.info('Initializing MonitoringClient...') _client = MonitoringClient() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding MonitoringClient...') yield _client LOGGER.info('Closing MonitoringClient...') _client.close() @pytest.fixture(scope='session') def management_db(): _management_db = ManagementDBTools.ManagementDB('monitoring.db') return _management_db @pytest.fixture(scope='session') def metrics_db(): _metrics_db = MetricsDBTools.MetricsDB( METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) return _metrics_db @pytest.fixture(scope='session') def subs_scheduler(): _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) _scheduler.start() return _scheduler def ingestion_data(monitoring_client): _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _include_kpi_request = include_kpi_request(_kpi_id) for i in range(200): _include_kpi_request = include_kpi_request(_kpi_id) monitoring_client.IncludeKpi(_include_kpi_request) time.sleep(0.01) ########################### # Tests Implementation ########################### # Test case that makes use of client fixture to test server's CreateKpi method def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') response = monitoring_client.SetKpi(create_kpi_request()) LOGGER.debug(str(response)) response = monitoring_client.SetKpi(create_kpi_request_b()) LOGGER.debug(str(response)) assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's DeleteKpi method def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('delete_kpi requesting') response = monitoring_client.SetKpi(create_kpi_request_b()) response = monitoring_client.DeleteKpi(response) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.SetKpi(create_kpi_request_c()) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptor) # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.GetKpiDescriptorList(Empty()) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptorList) # Test case that makes use of client fixture to test server's IncludeKpi method def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's MonitorKpi method def test_monitor_kpi( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.info('test_monitor_kpi begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Update the object ------------------------------------------------------------------------------------------ LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID response = monitoring_client.SetKpi(create_kpi_request()) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's QueryKpiData method def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_query_kpi_data') response = monitoring_client.QueryKpiData(kpi_query()) LOGGER.debug(str(response)) assert isinstance(response, KpiList) def test_ingestion_data(monitoring_client): _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _include_kpi_request = include_kpi_request(_kpi_id) for i in range(100): _include_kpi_request = include_kpi_request(_kpi_id) monitoring_client.IncludeKpi(_include_kpi_request) time.sleep(0.01) # def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler): # subs_scheduler.add_job(ingestion_data(monitoring_client),id="1") # Test case that makes use of client fixture to test server's SetKpiSubscription method def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db)) # thread.start() monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) LOGGER.debug(response) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): LOGGER.warning('test_get_subs_descriptor') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) for item in response: response = monitoring_client.GetSubsDescriptor(item.subs_id) LOGGER.debug(response) assert isinstance(response, SubsDescriptor) # Test case that makes use of client fixture to test server's GetSubscriptions method def test_get_subscriptions(monitoring_client): LOGGER.warning('test_get_subscriptions') response = monitoring_client.GetSubscriptions(Empty()) LOGGER.debug(response) assert isinstance(response, SubsList) # Test case that makes use of client fixture to test server's DeleteSubscription method def test_delete_subscription(monitoring_client): LOGGER.warning('test_delete_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) for item in subs: response = monitoring_client.DeleteSubscription(item.subs_id) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') response = monitoring_client.SetKpiAlarm(alarm_descriptor()) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) # Test case that makes use of client fixture to test server's GetAlarms method def test_get_alarms(monitoring_client): LOGGER.warning('test_get_alarms') response = monitoring_client.GetAlarms(Empty()) LOGGER.debug(response) assert isinstance(response, AlarmList) # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) response = monitoring_client.GetAlarmDescriptor(alarm_id) LOGGER.debug(response) assert isinstance(response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method def test_get_alarm_response_stream(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id)) assert isinstance(response, _MultiThreadedRendezvous) for item in response: LOGGER.debug(response) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor()) response = monitoring_client.DeleteAlarm(alarm_id) LOGGER.debug(type(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi()) LOGGER.debug(str(response)) assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name # LOGGER.warning('test_getinstant_kpi begin') # kpi_id = monitoring_client.SetKpi(KpiId()) # monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) # sleep(0.3) # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) # response = monitoring_client.GetInstantKpi(KpiId()) # LOGGER.debug(type(response)) # assert response.kpi_id.kpi_id.uuid == "NoID" def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) assert isinstance(response, int) def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpis begin') response = management_db.get_KPIS() assert isinstance(response, list) def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_connection_id = _create_kpi_request.connection_id.connection_id.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_connection_id) response = management_db.delete_KPI(_kpi_id) assert response def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metric_sdb_tools_write_kpi begin') def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Update the object ------------------------------------------------------------------------------------------ LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() def test_get_device_events( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Check create event ----------------------------------------------------------------------------------------- LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID event = events_collector.get_event(block=True) assert isinstance(event, DeviceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() def test_listen_events( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_listen_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID sleep(0.1) kpi_id_list = events_collector.listen_events() assert len(kpi_id_list) > 0 events_collector.stop()