Skip to content
Snippets Groups Projects
test_unitary.py 21.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import copy, logging, os, pytest
    
    from time import sleep
    
    from common.Constants import ServiceNameEnum
    from common.Settings import (
        ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
    
    from common.orm.Database import Database
    from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
    from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
    from common.message_broker.MessageBroker import MessageBroker
    
    from context.client.ContextClient import ContextClient
    from context.service.grpc_server.ContextService import ContextService
    
    from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device
    
    
    from device.client.DeviceClient import DeviceClient
    from device.service.DeviceService import DeviceService
    from device.service.driver_api.DriverFactory import DriverFactory
    from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
    from device.service.drivers import DRIVERS
    
    from monitoring.client.MonitoringClient import MonitoringClient
    
    from common.proto import context_pb2, monitoring_pb2
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    
    from monitoring.service import SqliteTools, InfluxTools
    from monitoring.service.MonitoringService import MonitoringService
    from monitoring.service.EventTools import EventsDeviceCollector
    from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request
    from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
    
    
    
    LOGGER = logging.getLogger(__name__)
    LOGGER.setLevel(logging.DEBUG)
    
    
    Javi Moreno's avatar
    Javi Moreno committed
    ###########################
    # Tests Setup
    ###########################
    
    LOCAL_HOST = '127.0.0.1'
    
    CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
    os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
    os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
    
    DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
    os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
    os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
    
    MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
    os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
    os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
    
    # INFLUXDB_HOSTNAME   = os.environ.get("INFLUXDB_HOSTNAME")
    # INFLUXDB_PORT       = os.environ.get("INFLUXDB_PORT")
    # INFLUXDB_USER       = os.environ.get("INFLUXDB_USER")
    # INFLUXDB_PASSWORD   = os.environ.get("INFLUXDB_PASSWORD")
    # INFLUXDB_DATABASE   = os.environ.get("INFLUXDB_DATABASE")
    METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
    METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
    METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
    METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
    
    
    @pytest.fixture(scope='session')
    def context_db_mb() -> Tuple[Database, MessageBroker]:
        _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
        _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
    
        yield _database, _message_broker
        _message_broker.terminate()
    
    @pytest.fixture(scope='session')
    
    def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
        database, message_broker = context_db_mb
    
        _service = ContextService(database, message_broker)
    
        _service.start()
        yield _service
        _service.stop()
    
    @pytest.fixture(scope='session')
    
    def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
        _client = ContextClient()
    
    def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
        LOGGER.info('Initializing DeviceService...')
        driver_factory = DriverFactory(DRIVERS)
        driver_instance_cache = DriverInstanceCache(driver_factory)
        _service = DeviceService(driver_instance_cache)
        _service.start()
    
        # yield the server, when test finishes, execution will resume to stop it
        LOGGER.info('Yielding DeviceService...')
        yield _service
    
        LOGGER.info('Terminating DeviceService...')
        _service.stop()
    
    @pytest.fixture(scope='session')
    def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
        _client = DeviceClient()
        yield _client
        _client.close()
    
    # This fixture will be requested by test cases and last during testing session
    @pytest.fixture(scope='session')
    def monitoring_service(
            context_service : ContextService,  # pylint: disable=redefined-outer-name
            device_service : DeviceService     # pylint: disable=redefined-outer-name
        ):
    
        LOGGER.info('Initializing MonitoringService...')
    
        _service = MonitoringService()
    
    
        # yield the server, when test finishes, execution will resume to stop it
    
        LOGGER.info('Yielding MonitoringService...')
    
    
        LOGGER.info('Terminating MonitoringService...')
    
    
    # This fixture will be requested by test cases and last during testing session.
    # The client requires the server, so client fixture has the server as dependency.
    @pytest.fixture(scope='session')
    
    def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
        LOGGER.info('Initializing MonitoringClient...')
        _client = MonitoringClient()
    
        # yield the server, when test finishes, execution will resume to stop it
        LOGGER.info('Yielding MonitoringClient...')
        yield _client
    
        LOGGER.info('Closing MonitoringClient...')
        _client.close()
    
    Javi Moreno's avatar
    Javi Moreno committed
    @pytest.fixture(scope='session')
    def sql_db():
    
        _sql_db = SqliteTools.SQLite('monitoring.db')
        return _sql_db
    
    # @pytest.fixture(scope='session')
    # def influx_db():
    #     _influx_db = InfluxTools.Influx(
    #         INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
    #     return _influx_db
    
    Javi Moreno's avatar
    Javi Moreno committed
    @pytest.fixture(scope='session')
    
    def metrics_db():
        _metrics_db = MetricsDBTools.MetricsDB(
            METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
        return _metrics_db
    
    
    Javi Moreno's avatar
    Javi Moreno committed
    ###########################
    # Tests Implementation
    ###########################
    
    # Test case that makes use of client fixture to test server's CreateKpi method
    
    def test_create_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    
        LOGGER.warning('test_create_kpi requesting')
    
        response = monitoring_client.CreateKpi(create_kpi_request())
    
        assert isinstance(response, monitoring_pb2.KpiId)
    
    # Test case that makes use of client fixture to test server's MonitorKpi method
    
    def test_monitor_kpi(
            context_client : ContextClient,                 # pylint: disable=redefined-outer-name
            device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
            monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
            context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
        ):
    
        LOGGER.warning('test_monitor_kpi begin')
    
        context_database = context_db_mb[0]
    
        # ----- Clean the database -----------------------------------------------------------------------------------------
        context_database.clear_all()
    
        # ----- Dump state of database before create the object ------------------------------------------------------------
        db_entries = context_database.dump()
        LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
        for db_entry in db_entries:
            LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
        LOGGER.info('-----------------------------------------------------------')
        assert len(db_entries) == 0
    
        # ----- Update the object ------------------------------------------------------------------------------------------
        LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
        device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
        device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
        response = device_client.AddDevice(Device(**device_with_connect_rules))
        assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    
        response = monitoring_client.CreateKpi(create_kpi_request())
        _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
        response = monitoring_client.MonitorKpi(_monitor_kpi_request)
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.debug(str(response))
    
        assert isinstance(response, context_pb2.Empty)
    
    
    # Test case that makes use of client fixture to test server's IncludeKpi method
    
    def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    
        # make call to server
        LOGGER.warning('test_include_kpi requesting')
    
        response = monitoring_client.IncludeKpi(include_kpi_request())
    
        LOGGER.debug(str(response))
        assert isinstance(response, context_pb2.Empty)
    
    
    # Test case that makes use of client fixture to test server's GetStreamKpi method
    
    def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    
        LOGGER.warning('test_getstream_kpi begin')
    
        response = monitoring_client.GetStreamKpi(kpi())
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.debug(str(response))
    
    Javi Moreno's avatar
     
    Javi Moreno committed
        #assert isinstance(response, monitoring_pb2.Kpi)
    
    
    # Test case that makes use of client fixture to test server's GetInstantKpi method
    
    # def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    #     LOGGER.warning('test_getinstant_kpi begin')
    #     response = monitoring_client.GetInstantKpi(kpi_id())
    #     LOGGER.debug(str(response))
    #     # assert isinstance(response, monitoring_pb2.Kpi)
    
    Javi Moreno's avatar
     
    Javi Moreno committed
    
    # Test case that makes use of client fixture to test server's GetInstantKpi method
    
    def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
     
    Javi Moreno committed
        LOGGER.warning('test_getkpidescritor_kpi begin')
    
        response = monitoring_client.CreateKpi(create_kpi_request())
    
        response = monitoring_client.GetKpiDescriptor(response)
    
    Javi Moreno's avatar
     
    Javi Moreno committed
        LOGGER.debug(str(response))
    
        assert isinstance(response, monitoring_pb2.KpiDescriptor)
    
    
    def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
    
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    
    Javi Moreno's avatar
    Javi Moreno committed
    
        response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        assert isinstance(response, int)
    
    
    def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
    
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    
        _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.get_KPI(_kpi_id)
    
    Javi Moreno's avatar
    Javi Moreno committed
        assert isinstance(response, tuple)
    
    
    def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
        response = sql_db.get_KPIS()
        assert isinstance(response, list)
    
    
    def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
    
    
        response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
    
        if not response:
            _create_kpi_request = create_kpi_request()
            kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
            kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
            kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
            kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
            kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    
    Javi Moreno's avatar
    Javi Moreno committed
    
            sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    
            response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
    
        assert response
    
    def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')
    
        response = sql_db.delete_kpid_id(1)
    
    
        if not response:
            _create_kpi_request = create_kpi_request()
            kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
            kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
            kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
            kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
            kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    
            _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
            response = sql_db.delete_kpid_id(_kpi_id)
    
        assert response
    
    # def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name
    #     LOGGER.warning('test_influxdb_tools_write_kpi begin')
    def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
        LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
    
    
    # def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name
    #     LOGGER.warning('test_influxdb_tools_read_kpi_points begin')
    def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
        LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
    
    def test_events_tools(
            context_client : ContextClient,                 # pylint: disable=redefined-outer-name
            device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
            monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
            context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
        ):
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_get_device_events begin')
    
        context_database = context_db_mb[0]
    
        # ----- Clean the database -----------------------------------------------------------------------------------------
        context_database.clear_all()
    
        # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    
        events_collector = EventsDeviceCollector()
    
    Javi Moreno's avatar
    Javi Moreno committed
        events_collector.start()
    
    
        # ----- Dump state of database before create the object ------------------------------------------------------------
    
    Javi Moreno's avatar
    Javi Moreno committed
        db_entries = context_database.dump()
        LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
        for db_entry in db_entries:
            LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
        LOGGER.info('-----------------------------------------------------------')
        assert len(db_entries) == 0
    
        # ----- Update the object ------------------------------------------------------------------------------------------
    
        LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
        device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
        device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
        response = device_client.AddDevice(Device(**device_with_connect_rules))
        assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    
    Javi Moreno's avatar
    Javi Moreno committed
    
        events_collector.stop()
    
    
    
    def test_get_device_events(
            context_client : ContextClient,                 # pylint: disable=redefined-outer-name
            device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
            monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
            context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
        ):
    
    Javi Moreno's avatar
    Javi Moreno committed
        LOGGER.warning('test_get_device_events begin')
    
    
        context_database = context_db_mb[0]
    
        # ----- Clean the database -----------------------------------------------------------------------------------------
        context_database.clear_all()
    
        # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    
        events_collector = EventsDeviceCollector()
    
        # ----- Dump state of database before create the object ------------------------------------------------------------
    
        db_entries = context_database.dump()
        LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
        for db_entry in db_entries:
            LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
        LOGGER.info('-----------------------------------------------------------')
        assert len(db_entries) == 0
    
        # ----- Check create event -----------------------------------------------------------------------------------------
    
        LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
        device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
        device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
        response = device_client.AddDevice(Device(**device_with_connect_rules))
        assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    
        event = events_collector.get_event(block=True)
    
        assert isinstance(event, DeviceEvent)
        assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    
        assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
    
    def test_listen_events(
            context_client : ContextClient,                 # pylint: disable=redefined-outer-name
            device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
            monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
            context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
        ):
    
    
        LOGGER.warning('test_listen_events begin')
    
        context_database = context_db_mb[0]
    
        # ----- Clean the database -----------------------------------------------------------------------------------------
        context_database.clear_all()
    
        # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    
        events_collector = EventsDeviceCollector()
    
        # ----- Dump state of database before create the object ------------------------------------------------------------
    
        db_entries = context_database.dump()
        LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
        for db_entry in db_entries:
            LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
        LOGGER.info('-----------------------------------------------------------')
        assert len(db_entries) == 0
    
    
        LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
        device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
        device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
        response = device_client.AddDevice(Device(**device_with_connect_rules))
        assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    
        kpi_id_list = events_collector.listen_events()
    
    
        assert len(kpi_id_list) > 0
    
        events_collector.stop()