# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy, os, pytest import threading import time from queue import Queue from random import random from time import sleep from typing import Tuple from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from grpc._channel import _MultiThreadedRendezvous from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from common.logger import getJSONLogger from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \ AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from monitoring.service.AlarmManager import AlarmManager from monitoring.service.MetricsDBTools import MetricsDB from monitoring.service.SubscriptionManager import SubscriptionManager os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position from monitoring.client.MonitoringClient import MonitoringClient from monitoring.service import ManagementDBTools, MetricsDBTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \ create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \ alarm_subscription from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID from monitoring.service.MonitoringServiceServicerImpl import LOGGER ########################### # Tests Setup ########################### LOCAL_HOST = '127.0.0.1' CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT) DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT) MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") @pytest.fixture(scope='session') def context_db_mb() -> Tuple[Database, MessageBroker]: _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name database, message_broker = context_db_mb database.clear_all() _service = ContextService(database, message_broker) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient() yield _client _client.close() @pytest.fixture(scope='session') def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name LOGGER.info('Initializing DeviceService...') driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = DeviceService(driver_instance_cache) _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding DeviceService...') yield _service LOGGER.info('Terminating DeviceService...') _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient() yield _client _client.close() # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def monitoring_service( context_service : ContextService, # pylint: disable=redefined-outer-name device_service : DeviceService # pylint: disable=redefined-outer-name ): LOGGER.info('Initializing MonitoringService...') _service = MonitoringService() _service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding MonitoringService...') yield _service LOGGER.info('Terminating MonitoringService...') _service.stop() # This fixture will be requested by test cases and last during testing session. # The client requires the server, so client fixture has the server as dependency. @pytest.fixture(scope='session') def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name LOGGER.info('Initializing MonitoringClient...') _client = MonitoringClient() # yield the server, when test finishes, execution will resume to stop it LOGGER.info('Yielding MonitoringClient...') yield _client LOGGER.info('Closing MonitoringClient...') _client.close() @pytest.fixture(scope='session') def management_db(): _management_db = ManagementDBTools.ManagementDB('monitoring.db') return _management_db @pytest.fixture(scope='session') def metrics_db(): _metrics_db = MetricsDBTools.MetricsDB( METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) return _metrics_db @pytest.fixture(scope='session') def subs_scheduler(): _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)}) _scheduler.start() return _scheduler def ingestion_data(): metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring") for i in range(200): kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') kpiId = "3" deviceId = 'DEV3' endpointId = 'END3' serviceId = 'SERV3' sliceId = 'SLC3' connectionId = 'CON3' time_stamp = timestamp_utcnow_to_float() kpi_value = 500*random() metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value) sleep(0.1) ########################### # Tests Implementation ########################### # Test case that makes use of client fixture to test server's CreateKpi method def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') response = monitoring_client.SetKpi(create_kpi_request()) LOGGER.debug(str(response)) response = monitoring_client.SetKpi(create_kpi_request_b()) LOGGER.debug(str(response)) assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's DeleteKpi method def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('delete_kpi requesting') response = monitoring_client.SetKpi(create_kpi_request_b()) response = monitoring_client.DeleteKpi(response) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.SetKpi(create_kpi_request_c()) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptor) # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.GetKpiDescriptorList(Empty()) LOGGER.debug(str(response)) assert isinstance(response, KpiDescriptorList) # Test case that makes use of client fixture to test server's IncludeKpi method def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's MonitorKpi method def test_monitor_kpi( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.info('test_monitor_kpi begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Update the object ------------------------------------------------------------------------------------------ LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID response = monitoring_client.SetKpi(create_kpi_request()) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's QueryKpiData method def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_query_kpi_data') response = monitoring_client.QueryKpiData(kpi_query()) LOGGER.debug(str(response)) assert isinstance(response, KpiList) # Test case that makes use of client fixture to test server's SetKpiSubscription method def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) subs_scheduler.add_job(ingestion_data) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) assert isinstance(response, _MultiThreadedRendezvous) for item in response: LOGGER.debug(item) assert isinstance(item, SubsResponse) # Test case that makes use of client fixture to test server's GetSubsDescriptor method def test_get_subs_descriptor(monitoring_client): LOGGER.warning('test_get_subs_descriptor') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) for item in response: response = monitoring_client.GetSubsDescriptor(item.subs_id) LOGGER.debug(response) assert isinstance(response, SubsDescriptor) # Test case that makes use of client fixture to test server's GetSubscriptions method def test_get_subscriptions(monitoring_client): LOGGER.warning('test_get_subscriptions') response = monitoring_client.GetSubscriptions(Empty()) LOGGER.debug(response) assert isinstance(response, SubsList) # Test case that makes use of client fixture to test server's DeleteSubscription method def test_delete_subscription(monitoring_client): LOGGER.warning('test_delete_subscription') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) for item in subs: response = monitoring_client.DeleteSubscription(item.subs_id) assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): LOGGER.warning('test_set_kpi_alarm') kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id)) LOGGER.debug(str(response)) assert isinstance(response, AlarmID) # Test case that makes use of client fixture to test server's GetAlarms method def test_get_alarms(monitoring_client): LOGGER.warning('test_get_alarms') response = monitoring_client.GetAlarms(Empty()) LOGGER.debug(response) assert isinstance(response, AlarmList) # Test case that makes use of client fixture to test server's GetAlarmDescriptor method def test_get_alarm_descriptor(monitoring_client): LOGGER.warning('test_get_alarm_descriptor') _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) _response = monitoring_client.GetAlarmDescriptor(_alarm_id) LOGGER.debug(_response) assert isinstance(_response, AlarmDescriptor) # Test case that makes use of client fixture to test server's GetAlarmResponseStream method def test_get_alarm_response_stream(monitoring_client,subs_scheduler): LOGGER.warning('test_get_alarm_descriptor') _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) subs_scheduler.add_job(ingestion_data) _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id)) assert isinstance(_response, _MultiThreadedRendezvous) for item in _response: LOGGER.debug(item) assert isinstance(item,AlarmResponse) # Test case that makes use of client fixture to test server's DeleteAlarm method def test_delete_alarm(monitoring_client): LOGGER.warning('test_delete_alarm') _kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id)) _response = monitoring_client.DeleteAlarm(_alarm_id) LOGGER.debug(type(_response)) assert isinstance(_response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi()) LOGGER.debug(str(response)) assert isinstance(response, _MultiThreadedRendezvous) # Test case that makes use of client fixture to test server's GetInstantKpi method # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name # LOGGER.warning('test_getinstant_kpi begin') # kpi_id = monitoring_client.SetKpi(KpiId()) # monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) # sleep(0.3) # response = monitoring_client.GetInstantKpi(kpi_id) # LOGGER.debug(response) # assert isinstance(response, Kpi) def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_insert_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) assert isinstance(response, int) def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id) response = management_db.get_KPI(_kpi_id) assert isinstance(response, tuple) def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpis begin') response = management_db.get_KPIS() assert isinstance(response, list) def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_managementdb_tools_get_kpi begin') _create_kpi_request = create_kpi_request() kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id) response = management_db.delete_KPI(_kpi_id) assert response def test_managementdb_tools_insert_alarm(management_db): LOGGER.warning('test_managementdb_tools_insert_alarm begin') _alarm_description = "Alarm Description" _alarm_name = "Alarm Name" _kpi_id = "3" _kpi_min_value = 0.0 _kpi_max_value = 250.0 _in_range = True _include_min_value = False _include_max_value = True _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value, _kpi_max_value, _in_range, _include_min_value, _include_max_value) LOGGER.debug(_alarm_id) assert isinstance(_alarm_id,int) # # def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name # LOGGER.warning('test_metric_sdb_tools_write_kpi begin') # _kpiId = "6" # # for i in range(50): # _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '') # _deviceId = 'DEV4' # _endpointId = 'END4' # _serviceId = 'SERV4' # _sliceId = 'SLC4' # _connectionId = 'CON4' # _time_stamp = timestamp_utcnow_to_float() # _kpi_value = 500*random() # # metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId, # _kpi_value) # sleep(0.05) # # _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'" # _data = metrics_db.run_query(_query) # assert len(_data) >= 50 # # def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler): # LOGGER.warning('test_subscription_manager_create_subscription begin') # subs_queue = Queue() # # subs_manager = SubscriptionManager(metrics_db) # # subs_scheduler.add_job(ingestion_data) # # kpi_id = "3" # sampling_duration_s = 20 # sampling_interval_s = 3 # real_start_time = timestamp_utcnow_to_float() # start_timestamp = real_start_time # end_timestamp = start_timestamp + sampling_duration_s # # subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s, # sampling_interval_s,start_timestamp,end_timestamp) # subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s, # sampling_duration_s,start_timestamp,end_timestamp) # # # This is here to simulate application activity (which keeps the main thread alive). # total_points = 0 # while True: # while not subs_queue.empty(): # list = subs_queue.get_nowait() # kpi_list = KpiList() # for item in list: # kpi = Kpi() # kpi.kpi_id.kpi_id.uuid = item[0] # kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) # kpi.kpi_value.floatVal = item[2] # kpi_list.kpi.append(kpi) # total_points += 1 # LOGGER.debug(kpi_list) # if timestamp_utcnow_to_float() > end_timestamp: # break # # assert total_points != 0 def test_events_tools( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Update the object ------------------------------------------------------------------------------------------ LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() def test_get_device_events( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 # ----- Check create event ----------------------------------------------------------------------------------------- LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID event = events_collector.get_event(block=True) assert isinstance(event, DeviceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() def test_listen_events( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name ): LOGGER.warning('test_listen_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector() events_collector.start() # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID sleep(0.1) kpi_id_list = events_collector.listen_events() assert len(kpi_id_list) > 0 events_collector.stop()