Skip to content
Snippets Groups Projects
test_unitary.py 20.6 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, logging, os, pytest
from time import sleep
from common.Constants import ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor

from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty

from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache

os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS  # pylint: disable=wrong-import-position

# pylint: disable=wrong-import-position
from monitoring.client.MonitoringClient import MonitoringClient
from common.proto import context_pb2, monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID


LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")

@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
    _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
    yield _database, _message_broker
    _message_broker.terminate()

@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database, message_broker = context_db_mb
    _service = ContextService(database, message_broker)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient()
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing DeviceService...')
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(driver_instance_cache)
    _service.start()
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding DeviceService...')
    yield _service

    LOGGER.info('Terminating DeviceService...')
    _service.stop()

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
    _client = DeviceClient()
    yield _client
    _client.close()
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
        context_service : ContextService,  # pylint: disable=redefined-outer-name
        device_service : DeviceService     # pylint: disable=redefined-outer-name
    ):
    LOGGER.info('Initializing MonitoringService...')
    _service = MonitoringService()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding MonitoringService...')

    LOGGER.info('Terminating MonitoringService...')

# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing MonitoringClient...')
    _client = MonitoringClient()
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding MonitoringClient...')
    yield _client
    LOGGER.info('Closing MonitoringClient...')
    _client.close()
Javi Moreno's avatar
Javi Moreno committed
@pytest.fixture(scope='session')
def sql_db():
    _sql_db = SqliteTools.SQLite('monitoring.db')
    return _sql_db
Javi Moreno's avatar
Javi Moreno committed

@pytest.fixture(scope='session')
def metrics_db():
    _metrics_db = MetricsDBTools.MetricsDB(
        METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
    return _metrics_db

Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_create_kpi requesting')
    response = monitoring_client.SetKpi(create_kpi_request())
    assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
    LOGGER.warning('test_monitor_kpi begin')
    context_database = context_db_mb[0]
    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID

    response = monitoring_client.SetKpi(create_kpi_request())
    _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
    response = monitoring_client.MonitorKpi(_monitor_kpi_request)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)


# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    # make call to server
    LOGGER.warning('test_include_kpi requesting')
    response = monitoring_client.IncludeKpi(include_kpi_request())
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_getstream_kpi begin')
    response = monitoring_client.GetStreamKpi(kpi())
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    #assert isinstance(response, Kpi)

# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_getinstant_kpi begin')
#     response = monitoring_client.GetInstantKpi(kpi_id())
#     LOGGER.debug(str(response))
#     # assert isinstance(response, Kpi)
Javi Moreno's avatar
 
Javi Moreno committed

# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
 
Javi Moreno committed
    LOGGER.warning('test_getkpidescritor_kpi begin')
    response = monitoring_client.SetKpi(create_kpi_request())
    response = monitoring_client.GetKpiDescriptor(response)
Javi Moreno's avatar
 
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, KpiDescriptor)
def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
Javi Moreno's avatar
Javi Moreno committed

    response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    assert isinstance(response, int)

def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member

    _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
    response = sql_db.get_KPI(_kpi_id)
Javi Moreno's avatar
Javi Moreno committed
    assert isinstance(response, tuple)

def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
    response = sql_db.get_KPIS()
    assert isinstance(response, list)

def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_get_kpi begin')

    response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
    if not response:
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
Javi Moreno's avatar
Javi Moreno committed

        sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
    assert response
def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')

    response = sql_db.delete_kpid_id(1)

    if not response:
        _create_kpi_request = create_kpi_request()
        kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
        kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
        kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
        kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
        kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
        _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
        response = sql_db.delete_kpid_id(_kpi_id)
    assert response
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metric_sdb_tools_write_kpi begin')


def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
def test_events_tools(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector()
Javi Moreno's avatar
Javi Moreno committed
    events_collector.start()

    # ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno's avatar
Javi Moreno committed
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno's avatar
Javi Moreno committed

    events_collector.stop()


def test_get_device_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector()
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Check create event -----------------------------------------------------------------------------------------
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    event = events_collector.get_event(block=True)
    assert isinstance(event, DeviceEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
def test_listen_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):

    LOGGER.warning('test_listen_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
    events_collector = EventsDeviceCollector()
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
    kpi_id_list = events_collector.listen_events()

    assert len(kpi_id_list) > 0
    events_collector.stop()