Skip to content
Snippets Groups Projects
test_unitary.py 19.7 KiB
Newer Older
import copy, grpc, logging, operator, os, pytest
from typing import Any, Dict, List, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from context.Config import (
    GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD)
from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import DeviceId
from context.service.grpc_server.ContextService import ContextService
from device.Config import (
    GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS,
    GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.client.DeviceClient import DeviceClient
from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports
DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports

DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
SCENARIOS = [
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
    #('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
]

@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    _service = ContextService(
        context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS,
        grace_period=CONTEXT_GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client, driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS,
        grace_period=DEVICE_GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
    _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT)
    yield _client
    _client.close()
def grpc_message_to_json_string(message):
    return str(MessageToDict(
        message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_device_add(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name
    context_client.SetContext(Context(**CONTEXT))
    context_client.SetTopology(Topology(**TOPOLOGY))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    with pytest.raises(grpc.RpcError) as e:
        device_client.AddDevice(Device(**DEVICE1))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'device.device_config.config_rules(['\
          'action: CONFIGACTION_SET\nresource_key: "dev/rsrc1/value"\nresource_value: "value1"\n, '\
          'action: CONFIGACTION_SET\nresource_key: "dev/rsrc2/value"\nresource_value: "value2"\n, '\
          'action: CONFIGACTION_SET\nresource_key: "dev/rsrc3/value"\nresource_value: "value3"\n]) is invalid; '\
          'RPC method AddDevice does not allow definition of Config Rules. Add the Device first, and then configure it.'
    assert e.value.details() == msg

    DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1)
    DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear()
    device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES))
    driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID) # we know the driver exists now
    assert driver is not None
    initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE1_ID))
    LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID))
    LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))

    driver_config = driver.GetConfig()
    LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    device_client.ConfigureDevice(Device(**DEVICE1))

    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == 3
    assert driver_config[0] == ('/dev/rsrc1/value', 'value1')
    assert driver_config[1] == ('/dev/rsrc2/value', 'value2')
    assert driver_config[2] == ('/dev/rsrc3/value', 'value3')

    DEVICE1_WITH = copy.deepcopy(DEVICE1)
    CONFIG_RULES : List[Dict[str, Any]] = DEVICE1_WITH['device_config']['config_rules']
    CONFIG_RULES.clear()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_DELETE, 'dev/rsrc1/value', ''))
    CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc10/value', 'value10'))
    CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc11/value', 'value11'))
    CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12'))
    device_client.ConfigureDevice(Device(**DEVICE1_WITH))
    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == 5
    assert driver_config[0] == ('/dev/rsrc10/value', 'value10')
    assert driver_config[1] == ('/dev/rsrc11/value', 'value11')
    assert driver_config[2] == ('/dev/rsrc12/value', 'value12')
    assert driver_config[3] == ('/dev/rsrc2/value', 'value2')
    assert driver_config[4] == ('/dev/rsrc3/value', 'value3')

    device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID))
    #LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
    LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
        '\n'.join([
            '{:s} {:s} = {:s}'.format(
                ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
            for config_rule in device_data.device_config.config_rules
        ])))
    device_client.DeleteDevice(DeviceId(**DEVICE1_ID))
    driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID, {}) # we know the driver exists now
    assert driver is None
    raise Exception()
#def test_add_device_wrong_attributes(device_client : DeviceClient):
#    # should fail with device uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_id']['device_id']['uuid'] = ''
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'device.device_id.device_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with device type is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_type'] = ''
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'device.device_type() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with wrong device operational status
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Method(AddDevice) does not accept OperationalStatus(KEEP_STATE). '\
#          'Permitted values for Method(AddDevice) are OperationalStatus([\'DISABLED\', \'ENABLED\']).'
#    assert e.value.details() == msg
#
#def test_add_device_wrong_endpoint(device_client : DeviceClient):
#    # should fail with unsupported endpoint context
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context'
#        request = Device(**copy_device)
#        device_client.AddDevice(request)
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Context(wrong-context) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Contexts({\'admin\'}). '\
#          'Optionally, leave field empty to use predefined Context(admin).'
#    assert e.value.details() == msg
#
#    # should fail with unsupported endpoint topology
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = 'wrong-topo'
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Topologies({\'admin\'}). '\
#          'Optionally, leave field empty to use predefined Topology(admin).'
#    assert e.value.details() == msg
#
#    # should fail with wrong endpoint device
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = 'wrong-device'
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Devices({\'DEV1\'}). '\
#          'Optionally, leave field empty to use predefined Device(DEV1).'
#    assert e.value.details() == msg
#
#    # should fail with endpoint port uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][0]['port_id']['port_id']['uuid'] = ''
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with endpoint port type is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][0]['port_type'] = ''
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'endpoint[#0].port_type() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with duplicate port in device
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['endpointList'][1]['port_id']['port_id']['uuid'] = 'EP2'
#        device_client.AddDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1)/Port(EP2) in Endpoint(#1) of '\
#          'Context(admin)/Topology(admin)/Device(DEV1).'
#    assert e.value.details() == msg
#
#def test_add_device(device_client : DeviceClient):
#    # should work
#    validate_device_id(MessageToDict(
#            device_client.AddDevice(Device(**DEVICE)),
#            including_default_value_fields=True, preserving_proto_field_name=True,
#            use_integers_for_enums=False))
#
#def test_add_device_duplicate(device_client : DeviceClient):
#    # should fail with device already exists
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        device_client.AddDevice(Device(**DEVICE))
#    assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
#    msg = 'Context(admin)/Topology(admin)/Device(DEV1) already exists in the database.'
#    assert e.value.details() == msg
#
#def test_delete_device_empty_uuid(device_client : DeviceClient):
#    # should fail with device uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device_id = copy.deepcopy(DEVICE_ID)
#        copy_device_id['device_id']['uuid'] = ''
#        device_client.DeleteDevice(DeviceId(**copy_device_id))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'device_id.device_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#def test_delete_device_not_found(device_client : DeviceClient):
#    # should fail with device not found
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device_id = copy.deepcopy(DEVICE_ID)
#        copy_device_id['device_id']['uuid'] = 'wrong-device-id'
#        device_client.DeleteDevice(DeviceId(**copy_device_id))
#    assert e.value.code() == grpc.StatusCode.NOT_FOUND
#    msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.'
#    assert e.value.details() == msg
#
#def test_delete_device(device_client : DeviceClient):
#    # should work
#    validate_empty(MessageToDict(
#            device_client.DeleteDevice(DeviceId(**DEVICE_ID)),
#            including_default_value_fields=True, preserving_proto_field_name=True,
#            use_integers_for_enums=False))
#
#def test_configure_device_empty_device_uuid(device_client : DeviceClient):
#    # should fail with device uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_id']['device_id']['uuid'] = ''
#        device_client.ConfigureDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'device.device_id.device_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#def test_configure_device_not_found(device_client : DeviceClient):
#    # should fail with device not found
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_id']['device_id']['uuid'] = 'wrong-device-id'
#        device_client.ConfigureDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.NOT_FOUND
#    msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.'
#    assert e.value.details() == msg
#
#def test_add_device_default_endpoint_context_topology_device(device_client : DeviceClient):
#    # should work
#    copy_device = copy.deepcopy(DEVICE)
#    copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = ''
#    copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = ''
#    copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = ''
#    validate_device_id(MessageToDict(
#            device_client.AddDevice(Device(**copy_device)),
#            including_default_value_fields=True, preserving_proto_field_name=True,
#            use_integers_for_enums=False))
#
#def test_configure_device_wrong_attributes(device_client : DeviceClient):
#    # should fail with device type is wrong
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_type'] = 'wrong-type'
#        device_client.ConfigureDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Device(DEV1) has Type(ROADM) in the database. Cannot be changed to Type(wrong-type).'
#    assert e.value.details() == msg
#
#    # should fail with endpoints cannot be modified
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        device_client.ConfigureDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    assert e.value.details() == 'Endpoints belonging to Device(DEV1) cannot be modified.'
#
#    # should fail with any change detected
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_device = copy.deepcopy(DEVICE)
#        copy_device['device_config']['device_config'] = ''
#        copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
#        copy_device['endpointList'].clear()
#        device_client.ConfigureDevice(Device(**copy_device))
#    assert e.value.code() == grpc.StatusCode.ABORTED
#    msg = 'Any change has been requested for Device(DEV1). '\
#          'Either specify a new configuration or a new device operational status.'
#    assert e.value.details() == msg
#
#def test_configure_device(device_client : DeviceClient):
#    # should work
#    copy_device = copy.deepcopy(DEVICE)
#    copy_device['device_config']['device_config'] = '<new_config/>'
#    copy_device['devOperationalStatus'] = OperationalStatus.DISABLED.value
#    copy_device['endpointList'].clear()
#    validate_device_id(MessageToDict(
#            device_client.ConfigureDevice(Device(**copy_device)),
#            including_default_value_fields=True, preserving_proto_field_name=True,
#            use_integers_for_enums=False))