Skip to content
test_unitary.py 13.5 KiB
Newer Older
import copy, grpc, logging, operator, os, pytest
from typing import Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from context.Config import (
    GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD)
from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import Context, DeviceId, Topology, Device
from context.service.grpc_server.ContextService import ContextService
from device.Config import (
    GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD)
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from device.tests.MockMonitoringService import MockMonitoringService
from monitoring.Config import (
    GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD)
from monitoring.client.monitoring_client import MonitoringClient
from service.Config import (
    GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as SERVICE_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as SERVICE_GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.client.ServiceClient import ServiceClient
from service.proto.context_pb2 import Service, ServiceId
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from service.service.ServiceService import ServiceService
from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from service.service.service_handlers import SERVICE_HANDLERS
from service.tests.Service_L3NM_EMU import (
    SERVICE_L3NM_EMU, SERVICE_L3NM_EMU_CONFIG_RULES, SERVICE_L3NM_EMU_CONSTRAINTS, SERVICE_L3NM_EMU_ENDPOINT_IDS,
    SERVICE_L3NM_EMU_ID)
#from device.service.MonitoringLoops import MonitoringLoops
from .CommonObjects import CONTEXT, DEVICE_EMU1, DEVICE_EMU2, DEVICE_EMU_CONNECT_RULES, TOPOLOGY
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports
DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports
SERVICE_GRPC_SERVICE_PORT = 10000 + SERVICE_GRPC_SERVICE_PORT # avoid privileged ports
MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports

DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
}

SCENARIOS = [
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
    #('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
]

@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()

@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    _service = ContextService(
        context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS,
        grace_period=CONTEXT_GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT)
    yield _client
    _client.close()

@pytest.fixture(scope='session')
def monitoring_service():
    _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS,
        grace_period=MONITORING_GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name
    _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT)
    #yield _client
    #_client.close()
    return _client

@pytest.fixture(scope='session')
def device_service(
    context_client : ContextClient,         # pylint: disable=redefined-outer-name
    monitoring_client : MonitoringClient):  # pylint: disable=redefined-outer-name

    _driver_factory = DriverFactory(DRIVERS)
    _driver_instance_cache = DriverInstanceCache(_driver_factory)
    _service = DeviceService(
        context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT,
        max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
    _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT)
    yield _client
    _client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def service_service(
    context_client : ContextClient, # pylint: disable=redefined-outer-name
    device_client : DeviceClient):  # pylint: disable=redefined-outer-name

    _service_handler_factory = ServiceHandlerFactory(SERVICE_HANDLERS)
    _service = ServiceService(
        context_client, device_client, _service_handler_factory,
        port=SERVICE_GRPC_SERVICE_PORT, max_workers=SERVICE_GRPC_MAX_WORKERS, grace_period=SERVICE_GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def service_client(service_service : ServiceService): # pylint: disable=redefined-outer-name
    _client = ServiceClient(address='127.0.0.1', port=SERVICE_GRPC_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()
def grpc_message_to_json_string(message):
    return str(MessageToDict(
        message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False))


def test_prepare_environment(
    context_client : ContextClient, device_client : DeviceClient):  # pylint: disable=redefined-outer-name

    context_client.SetContext(Context(**CONTEXT))
    context_client.SetTopology(Topology(**TOPOLOGY))

    DEVICE_EMU1_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU1)
    DEVICE_EMU1_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
    device_client.AddDevice(Device(**DEVICE_EMU1_WITH_CONNECT_RULES))

    DEVICE_EMU2_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU2)
    DEVICE_EMU2_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
    device_client.AddDevice(Device(**DEVICE_EMU2_WITH_CONNECT_RULES))


def test_service_create_error_cases(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    with pytest.raises(grpc.RpcError) as e:
        SERVICE_WITH_ENDPOINTS = copy.deepcopy(SERVICE_L3NM_EMU)
        SERVICE_WITH_ENDPOINTS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS)
        service_client.CreateService(Service(**SERVICE_WITH_ENDPOINTS))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg_head = 'service.service_endpoint_ids(['
    msg_tail = ']) is invalid; RPC method CreateService does not accept Endpoints. '\
               'Endpoints should be configured after creating the service.'
    except_msg = str(e.value.details())
    assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)

    with pytest.raises(grpc.RpcError) as e:
        SERVICE_WITH_CONFIG_RULES = copy.deepcopy(SERVICE_L3NM_EMU)
        SERVICE_WITH_CONFIG_RULES['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES)
        service_client.CreateService(Service(**SERVICE_WITH_CONFIG_RULES))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg_head = 'service.service_config.config_rules(['
    msg_tail = ']) is invalid; RPC method CreateService does not accept Config Rules. '\
               'Config Rules should be configured after creating the service.'
    except_msg = str(e.value.details())
    assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)

    with pytest.raises(grpc.RpcError) as e:
        SERVICE_WITH_CONSTRAINTS = copy.deepcopy(SERVICE_L3NM_EMU)
        SERVICE_WITH_CONSTRAINTS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS)
        service_client.CreateService(Service(**SERVICE_WITH_CONSTRAINTS))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg_head = 'service.service_constraints(['
    msg_tail = ']) is invalid; RPC method CreateService does not accept Constraints. '\
               'Constraints should be configured after creating the service.'
    except_msg = str(e.value.details())
    assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)


def test_service_create_correct(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    service_client.CreateService(Service(**SERVICE_L3NM_EMU))
    #driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
    #assert driver is not None


def test_service_get_created(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID))
    LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))


def test_service_update(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    # Configure
    SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU)
    SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS)
    SERVICE_WITH_SETTINGS['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES)
    SERVICE_WITH_SETTINGS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS)
    service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS))

    for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS:
        device_id = endpoint_id['device_id']
        device_data = context_client.GetDevice(DeviceId(**device_id))
        for i,config_rule in enumerate(device_data.device_config.config_rules):
            LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format(
                str(device_id), i, grpc_message_to_json_string(config_rule)))

    # Deconfigure
    SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU)
    SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend([]) # remove endpoints
    service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS))

    for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS:
        device_id = endpoint_id['device_id']
        device_data = context_client.GetDevice(DeviceId(**device_id))
        for i,config_rule in enumerate(device_data.device_config.config_rules):
            LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format(
                str(device_id), i, grpc_message_to_json_string(config_rule)))


def test_service_get_updated(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID))
    LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data)))


def test_service_delete(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    service_client : ServiceClient,     # pylint: disable=redefined-outer-name
    service_service : ServiceService):  # pylint: disable=redefined-outer-name

    service_client.DeleteService(ServiceId(**SERVICE_L3NM_EMU_ID))
    #driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID, {})
    #assert driver is None