diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index d3799689eee4a1a12f4d4d7998d92482c59adb16..0701c5ce8bc56ee34d0a90760b0d6e88fdbbee42 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -12,35 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy, logging, os, pytest +import logging +import os +import socket +import pytest from typing import Tuple -from common.Constants import ServiceNameEnum -from common.Settings import ( - ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) -from common.orm.Database import Database -from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum -from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum -from common.message_broker.MessageBroker import MessageBroker - -from context.client.ContextClient import ContextClient -from context.service.grpc_server.ContextService import ContextService -from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device -from device.client.DeviceClient import DeviceClient -from device.service.DeviceService import DeviceService -from device.service.driver_api.DriverFactory import DriverFactory -from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -from device.service.drivers import DRIVERS -from monitoring.client.MonitoringClient import MonitoringClient +from monitoring.client.monitoring_client import MonitoringClient +from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, DEVICE_GRPC_SERVICE_PORT from monitoring.proto import context_pb2, monitoring_pb2 from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType from monitoring.service import SqliteTools, InfluxTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector -from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request -from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID +from common.orm.Database import Database +from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum +from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum +from common.message_broker.MessageBroker import MessageBroker + +from context.Config import ( + GRPC_SERVICE_PORT as grpc_port_context, + GRPC_MAX_WORKERS as grpc_workers_context, + GRPC_GRACE_PERIOD as grpc_grace_context +) +from context.client.ContextClient import ContextClient +from context.service.grpc_server.ContextService import ContextService +from context.service.Populate import populate +from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device +from context.tests.Objects import (DEVICE_R1, DEVICE_R1_UUID) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -49,19 +50,18 @@ LOGGER.setLevel(logging.DEBUG) # Tests Setup ########################### -LOCAL_HOST = '127.0.0.1' +SERVER_ADDRESS = '127.0.0.1' +LISTEN_ADDRESS = '[::]' +GRPC_PORT_MONITORING = 7070 -CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports -os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT) +GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports +DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports +MONITORING_GRPC_SERVICE_PORT = GRPC_PORT_MONITORING # avoid privileged ports -DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports -os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT) -MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports -os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) +SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit + ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), +] INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") @@ -69,61 +69,49 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") -@pytest.fixture(scope='session') -def context_db_mb() -> Tuple[Database, MessageBroker]: - _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) - _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) +@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) +def context_db_mb(request) -> Tuple[Database, MessageBroker]: + name,db_backend,db_settings,mb_backend,mb_settings = request.param + msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' + LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) + _database = Database(get_database_backend(backend=db_backend, **db_settings)) + _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') -def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - database, message_broker = context_db_mb +def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + database = context_db_mb[0] database.clear_all() - _service = ContextService(database, message_broker) + _service = ContextService( + database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context, + grace_period=grpc_grace_context) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') -def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient() +def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name + _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT) yield _client _client.close() -@pytest.fixture(scope='session') -def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name - LOGGER.info('Initializing DeviceService...') - driver_factory = DriverFactory(DRIVERS) - driver_instance_cache = DriverInstanceCache(driver_factory) - _service = DeviceService(driver_instance_cache) - _service.start() - - # yield the server, when test finishes, execution will resume to stop it - LOGGER.info('Yielding DeviceService...') - yield _service - - LOGGER.info('Terminating DeviceService...') - _service.stop() - -@pytest.fixture(scope='session') -def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name - _client = DeviceClient() - yield _client - _client.close() # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') -def monitoring_service( - context_service : ContextService, # pylint: disable=redefined-outer-name - device_service : DeviceService # pylint: disable=redefined-outer-name - ): +def monitoring_service(): + LOGGER.warning('monitoring_service begin') + + service_port = GRPC_SERVICE_PORT + max_workers = GRPC_MAX_WORKERS + grace_period = GRPC_GRACE_PERIOD + LOGGER.info('Initializing MonitoringService...') - _service = MonitoringService() + _service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period) _service.start() # yield the server, when test finishes, execution will resume to stop it - LOGGER.info('Yielding MonitoringService...') + LOGGER.warning('monitoring_service yielding') yield _service LOGGER.info('Terminating MonitoringService...') @@ -132,187 +120,224 @@ def monitoring_service( # This fixture will be requested by test cases and last during testing session. # The client requires the server, so client fixture has the server as dependency. @pytest.fixture(scope='session') -def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name - LOGGER.info('Initializing MonitoringClient...') - _client = MonitoringClient() +def monitoring_client(monitoring_service): + LOGGER.warning('monitoring_client begin') + client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING) # instantiate the client + LOGGER.warning('monitoring_client returning') + return client - # yield the server, when test finishes, execution will resume to stop it - LOGGER.info('Yielding MonitoringClient...') - yield _client +# This fixture will be requested by test cases and last during testing session. +@pytest.fixture(scope='session') +def kpi(): + LOGGER.warning('test_include_kpi begin') + # form request + kpi = monitoring_pb2.Kpi() + kpi.kpi_id.kpi_id.uuid = 'KPIID0000' + kpi.kpiDescription = 'KPI Desc' + return kpi - LOGGER.info('Closing MonitoringClient...') - _client.close() +@pytest.fixture(scope='session') +def kpi_id(): + LOGGER.warning('test_include_kpi begin') + + # form request + kpi_id = monitoring_pb2.KpiId() + kpi_id.kpi_id.uuid = str(1) + + return kpi_id @pytest.fixture(scope='session') def sql_db(): - _sql_db = SqliteTools.SQLite('monitoring.db') - return _sql_db + sql_db = SqliteTools.SQLite('monitoring.db') + return sql_db @pytest.fixture(scope='session') def influx_db(): - _influx_db = InfluxTools.Influx( - INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) - return _influx_db + influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) + return influx_db + +@pytest.fixture(scope='session') +def create_kpi_request(): + LOGGER.warning('test_include_kpi begin') + + create_kpi_request = monitoring_pb2.KpiDescriptor() + create_kpi_request.kpi_description = 'KPI Description Test' + create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED + create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member + create_kpi_request.service_id.service_uuid.uuid = "SERV1" + create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" + + return create_kpi_request + +@pytest.fixture(scope='session') +def monitor_kpi_request(): + LOGGER.warning('test_monitor_kpi begin') + + monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) + monitor_kpi_request.sampling_duration_s = 120 + monitor_kpi_request.sampling_interval_s = 5 + + return monitor_kpi_request + + +@pytest.fixture(scope='session') +def include_kpi_request(): + LOGGER.warning('test_include_kpi begin') + + include_kpi_request = monitoring_pb2.Kpi() + include_kpi_request.kpi_id.kpi_id.uuid = str(1) + include_kpi_request.timestamp = "2021-10-12T13:14:42Z" + include_kpi_request.kpi_value.intVal = 500 + + return include_kpi_request + +@pytest.fixture(scope='session') +def address(): + address = '127.0.0.1' + return address +@pytest.fixture(scope='session') +def port(): + port = 7070 + return port ########################### # Tests Implementation ########################### # Test case that makes use of client fixture to test server's CreateKpi method -def test_create_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_create_kpi(monitoring_client,create_kpi_request): # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.CreateKpi(create_kpi_request()) + response = monitoring_client.CreateKpi(create_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiId) # Test case that makes use of client fixture to test server's MonitorKpi method -def test_monitor_kpi( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name - context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name - ): +def test_monitor_kpi(monitoring_client,create_kpi_request): LOGGER.warning('test_monitor_kpi begin') - context_database = context_db_mb[0] + response = monitoring_client.CreateKpi(create_kpi_request) - # ----- Clean the database ----------------------------------------------------------------------------------------- - context_database.clear_all() + monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + monitor_kpi_request.kpi_id.kpi_id.uuid = response.kpi_id.uuid + monitor_kpi_request.sampling_duration_s = 120 + monitor_kpi_request.sampling_interval_s = 5 - # ----- Dump state of database before create the object ------------------------------------------------------------ - db_entries = context_database.dump() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 0 - - # ----- Update the object ------------------------------------------------------------------------------------------ - LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) - device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) - device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) - response = device_client.AddDevice(Device(**device_with_connect_rules)) - assert response.device_uuid.uuid == DEVICE_DEV1_UUID - - response = monitoring_client.CreateKpi(create_kpi_request()) - _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member - response = monitoring_client.MonitorKpi(_monitor_kpi_request) + response = monitoring_client.MonitorKpi(monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's IncludeKpi method -def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_include_kpi(monitoring_client,include_kpi_request): # make call to server LOGGER.warning('test_include_kpi requesting') - response = monitoring_client.IncludeKpi(include_kpi_request()) + response = monitoring_client.IncludeKpi(include_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method -def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_get_stream_kpi(monitoring_client,include_kpi_request): LOGGER.warning('test_getstream_kpi begin') - response = monitoring_client.GetStreamKpi(kpi()) + response = monitoring_client.GetStreamKpi(kpi) LOGGER.debug(str(response)) #assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_get_instant_kpi(monitoring_client,kpi_id): LOGGER.warning('test_getinstant_kpi begin') - response = monitoring_client.GetInstantKpi(kpi_id()) + response = monitoring_client.GetInstantKpi(kpi_id) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_get_kpidescritor_kpi(monitoring_client,create_kpi_request): LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.CreateKpi(create_kpi_request()) + + response = monitoring_client.CreateKpi(create_kpi_request) + response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiDescriptor) -def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name +def test_sqlitedb_tools_insert_kpi(sql_db, create_kpi_request): LOGGER.warning('test_sqlitedb_tools_insert_kpi begin') - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + + kpi_description = create_kpi_request.kpi_description + kpi_sample_type = create_kpi_request.kpi_sample_type + kpi_device_id = create_kpi_request.device_id.device_uuid.uuid + kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = create_kpi_request.service_id.service_uuid.uuid response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) assert isinstance(response, int) -def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name +def test_sqlitedb_tools_get_kpi(sql_db, create_kpi_request): LOGGER.warning('test_sqlitedb_tools_get_kpi begin') - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - - _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.get_KPI(_kpi_id) + + kpi_description = create_kpi_request.kpi_description + kpi_sample_type = create_kpi_request.kpi_sample_type + kpi_device_id = create_kpi_request.device_id.device_uuid.uuid + kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = create_kpi_request.service_id.service_uuid.uuid + + kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + + response = sql_db.get_KPI(kpi_id) assert isinstance(response, tuple) -def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name +def test_sqlitedb_tools_get_kpis(sql_db): LOGGER.warning('test_sqlitedb_tools_get_kpis begin') response = sql_db.get_KPIS() assert isinstance(response, list) -def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name +def test_sqlitedb_tools_delete_kpi(sql_db, create_kpi_request): LOGGER.warning('test_sqlitedb_tools_get_kpi begin') response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - if not response: - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + if response == False: + kpi_description = create_kpi_request.kpi_description + kpi_sample_type = create_kpi_request.kpi_sample_type + kpi_device_id = create_kpi_request.device_id.device_uuid.uuid + kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = create_kpi_request.service_id.service_uuid.uuid sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - assert response + assert response == True -def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name +def test_sqlitedb_tools_delete_kpid_id(sql_db, create_kpi_request): LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin') response = sql_db.delete_kpid_id(1) - if not response: - _create_kpi_request = create_kpi_request() - kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member - kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member - kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member - kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member - kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + if response == False: + kpi_description = create_kpi_request.kpi_description + kpi_sample_type = create_kpi_request.kpi_sample_type + kpi_device_id = create_kpi_request.device_id.device_uuid.uuid + kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid + kpi_service_id = create_kpi_request.service_id.service_uuid.uuid - _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.delete_kpid_id(_kpi_id) + kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = sql_db.delete_kpid_id(kpi_id) - assert response + assert response == True -def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name +def test_influxdb_tools_write_kpi(influx_db): LOGGER.warning('test_influxdb_tools_write_kpi begin') -def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name +def test_influxdb_tools_read_kpi_points(influx_db): LOGGER.warning('test_influxdb_tools_read_kpi_points begin') -def test_events_tools( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name - context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name - ): +def test_events_tools(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, + context_db_mb: Tuple[Database, MessageBroker]): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] @@ -321,10 +346,10 @@ def test_events_tools( context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector() + events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client) events_collector.start() - # ----- Dump state of database before create the object ------------------------------------------------------------ + # # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -332,22 +357,18 @@ def test_events_tools( LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 + populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests + # ----- Update the object ------------------------------------------------------------------------------------------ - LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) - device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) - device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) - response = device_client.AddDevice(Device(**device_with_connect_rules)) - assert response.device_uuid.uuid == DEVICE_DEV1_UUID + response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) + assert response.device_uuid.uuid == DEVICE_R1_UUID events_collector.stop() -def test_get_device_events( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name - context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name - ): +def test_get_device_events(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, + context_db_mb: Tuple[Database, MessageBroker]): LOGGER.warning('test_get_device_events begin') @@ -357,10 +378,10 @@ def test_get_device_events( context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector() + events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) events_collector.start() - # ----- Dump state of database before create the object ------------------------------------------------------------ + # # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -368,26 +389,20 @@ def test_get_device_events( LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - # ----- Check create event ----------------------------------------------------------------------------------------- - LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) - device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) - device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) - response = device_client.AddDevice(Device(**device_with_connect_rules)) - assert response.device_uuid.uuid == DEVICE_DEV1_UUID + populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests + # ----- Check create event ----------------------------------------------------------------------------------------- event = events_collector.get_event(block=True) + assert isinstance(event, DeviceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE - assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID + assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID events_collector.stop() -def test_listen_events( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name - context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name - ): +def test_listen_events(monitoring_client: MonitoringClient, + context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name + context_db_mb: Tuple[Database, MessageBroker]): LOGGER.warning('test_listen_events begin') @@ -397,10 +412,10 @@ def test_listen_events( context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector() + events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) events_collector.start() - # ----- Dump state of database before create the object ------------------------------------------------------------ + # # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -408,12 +423,18 @@ def test_listen_events( LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) - device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) - device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) - response = device_client.AddDevice(Device(**device_with_connect_rules)) - assert response.device_uuid.uuid == DEVICE_DEV1_UUID + populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests kpi_id_list = events_collector.listen_events() - assert len(kpi_id_list) > 0 + assert bool(kpi_id_list) == True + +def test_socket_ports(address, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = s.connect_ex((address,port)) + + if result == 0: + print('socket is open') + else: + print('socket is not open') + s.close()