Skip to content
test_unitary.py 18.1 KiB
Newer Older
import copy, logging, pytest
#import grpc, requests, time
#from google.protobuf.json_format import MessageToDict
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum
#from common.tests.Assertions import validate_empty, validate_link_id, validate_topology, validate_topology_has_devices,\
#    validate_topology_has_links, validate_topology_is_empty
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import \
    Context,  ContextEvent,  ContextId,  ContextIdList,  ContextList,  \
    Device,   DeviceEvent,   DeviceId,   DeviceIdList,   DeviceList,   \
    Empty,                                                             \
    Link,     LinkEvent,     LinkId,     LinkIdList,     LinkList,     \
    Service,  ServiceEvent,  ServiceId,  ServiceIdList,  ServiceList,  \
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList
from context.service.grpc_server.ContextService import ContextService
from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD #, RESTAPI_SERVICE_PORT, \
#    RESTAPI_BASE_URL
#from context.service.rest_server.Server import Server
#from context.service.rest_server.resources.Context import Context
#from .populate_database import populate_example
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
#restapi_port = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

## use "copy.deepcopy" to prevent propagating forced changes during tests
CONTEXT_ID = {'context_uuid': {'uuid': DEFAULT_CONTEXT_UUID}}
CONTEXT = {'context_id': copy.deepcopy(CONTEXT_ID)}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
TOPOLOGY_ID = {'context_id': copy.deepcopy(CONTEXT_ID), 'topology_uuid': {'uuid': DEFAULT_TOPOLOGY_UUID}}
TOPOLOGY = {'topology_id': copy.deepcopy(TOPOLOGY_ID)}
#LINK_ID = {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}}
#LINK = {
#    'link_id': {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}},
#    'endpointList' : [
#        {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV1'}}, 'port_id': {'uuid' : 'EP2'}},
#        {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV2'}}, 'port_id': {'uuid' : 'EP1'}},
#    ]
#}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_database():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    database_backend = get_database_backend(engine=BackendEnum.INMEMORY)
    _database = Database(database_backend)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return _database

@pytest.fixture(scope='session')
def context_service(context_database : Database): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(
        context_database, port=grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

#@pytest.fixture(scope='session')
#def context_service_rest(context_database : Database):
#    _rest_server = Server(port=restapi_port, base_url=RESTAPI_BASE_URL)
#    _rest_server.add_resource(Context, '/context', endpoint='api.context', resource_class_args=(context_database,))
#    _rest_server.start()
#    time.sleep(1) # bring time for the server to start
#    yield _rest_server
#    _rest_server.shutdown()
#    _rest_server.join()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient(address='127.0.0.1', port=grpc_port)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()

def test_context_instances(
    context_client : ContextClient, context_database : Database): # pylint: disable=redefined-outer-name

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database.clear_all()
    response = context_client.ListContextIds(Empty())
    assert len(response.context_ids) == 0

    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == 0

    response = context_client.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(response.context_id.context_uuid.uuid) == 0
    assert len(response.topology_ids) == 0
    assert len(response.service_ids) == 0

    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    response = context_client.SetContext(Context(**CONTEXT))
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID

    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry))
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 2

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client.GetContext(ContextId(**CONTEXT_ID))
    assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    assert len(response.topology_ids) == 0
    assert len(response.service_ids) == 0

    response = context_client.ListContextIds(Empty())
    assert len(response.context_ids) == 1
    assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID

    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == 1
    assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    assert len(response.contexts[0].topology_ids) == 0
    assert len(response.contexts[0].service_ids) == 0

    context_client.RemoveContext(ContextId(**CONTEXT_ID))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

def test_topology_instances(
    context_client : ContextClient, context_database : Database): # pylint: disable=redefined-outer-name

    context_database.clear_all()

    response = context_client.SetContext(Context(**CONTEXT))
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID

    response = context_client.ListTopologyIds(ContextId(**CONTEXT_ID))
    assert len(response.topology_ids) == 0

    response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
    assert len(response.topologies) == 0

    response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID))
    assert len(response.topology_id.topology_uuid.uuid) == 0
    assert len(response.topology_id.context_id.context_uuid.uuid) == 0
    assert len(response.device_ids) == 0
    assert len(response.link_ids) == 0

    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry))
    LOGGER.info('-----------------------------------------------------------')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(db_entries) == 2

    response = context_client.SetTopology(Topology(**TOPOLOGY))
    assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID

    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry))
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 5

    response = context_client.ListTopologyIds(ContextId(**CONTEXT_ID))
    assert len(response.topology_ids) == 1
    assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID

    response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
    assert len(response.topologies) == 1
    assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
    assert len(response.topologies[0].device_ids) == 0
    assert len(response.topologies[0].link_ids) == 0

    context_client.RemoveTopology(TopologyId(**TOPOLOGY_ID))
    context_client.RemoveContext(ContextId(**CONTEXT_ID))

    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0


#def test_get_topology_empty(context_client : ContextClient, context_database : Database):
#    # should work
#    context_database.clear_all()
#    validate_topology_is_empty(MessageToDict(
#        context_client.GetTopology(Empty()),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False))
#
#def test_get_topology_completed(context_client : ContextClient, context_database : Database):
#    # should work
#    populate_example(context_database, add_services=False)
#    topology = MessageToDict(
#        context_client.GetTopology(Empty()),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False)
#    validate_topology(topology)
#    validate_topology_has_devices(topology)
#    validate_topology_has_links(topology)
#
#def test_delete_link_empty_uuid(context_client : ContextClient):
#    # should fail with link not found
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link_id = copy.deepcopy(LINK_ID)
#        copy_link_id['link_id']['uuid'] = ''
#        context_client.DeleteLink(LinkId(**copy_link_id))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'link_id.link_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#def test_add_link_already_exists(context_client : ContextClient):
#    # should fail with link already exists
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        context_client.AddLink(Link(**LINK))
#    assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
#    msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) already exists in the database.'
#    assert e.value.details() == msg
#
#def test_delete_link(context_client : ContextClient):
#    # should work
#    validate_empty(MessageToDict(
#        context_client.DeleteLink(LinkId(**LINK_ID)),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False))
#
#def test_delete_link_not_existing(context_client : ContextClient):
#    # should fail with link not found
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        context_client.DeleteLink(LinkId(**LINK_ID))
#    assert e.value.code() == grpc.StatusCode.NOT_FOUND
#    msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.'
#    assert e.value.details() == msg
#
#def test_add_link_uuid_empty(context_client : ContextClient):
#    # should fail with link uuid empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['link_id']['link_id']['uuid'] = ''
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'link.link_id.link_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#def test_add_link_wrong_endpoint(context_client : ContextClient):
#    # should fail with wrong endpoint context
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context'
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Context(wrong-context) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Contexts({\'admin\'}). '\
#          'Optionally, leave field empty to use predefined Context(admin).'
#    assert e.value.details() == msg
#
#    # should fail with wrong endpoint topology
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = 'wrong-topo'
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Topologies({\'admin\'}). '\
#          'Optionally, leave field empty to use predefined Topology(admin).'
#    assert e.value.details() == msg
#
#    # should fail with device uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = ''
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'endpoint_id[#0].dev_id.device_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with wrong endpoint device
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = 'wrong-device'
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.NOT_FOUND
#    msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.'
#    assert e.value.details() == msg
#
#    # should fail with endpoint uuid is empty
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['port_id']['uuid'] = ''
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\
#          'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
#    assert e.value.details() == msg
#
#    # should fail with wrong endpoint port
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][0]['port_id']['uuid'] = 'wrong-port'
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.NOT_FOUND
#    msg = 'Context(admin)/Topology(admin)/Device(DEV1)/Port(wrong-port) in Endpoint(#0) of '\
#          'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.'
#    assert e.value.details() == msg
#
#    # should fail with endpoint device duplicated
#    with pytest.raises(grpc._channel._InactiveRpcError) as e:
#        copy_link = copy.deepcopy(LINK)
#        copy_link['endpointList'][1]['dev_id']['device_id']['uuid'] = 'DEV1'
#        context_client.AddLink(Link(**copy_link))
#    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
#    msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1) in Endpoint(#1) of '\
#          'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1).'
#    assert e.value.details() == msg
#
#def test_add_link(context_client : ContextClient):
#    # should work
#    validate_link_id(MessageToDict(
#        context_client.AddLink(Link(**LINK)),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False))
#
#def test_delete_link_2(context_client : ContextClient):
#    # should work
#    validate_empty(MessageToDict(
#        context_client.DeleteLink(LinkId(**LINK_ID)),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False))
#
#def test_add_link_default_endpoint_context_topology(context_client : ContextClient):
#    # should work
#    copy_link = copy.deepcopy(LINK)
#    copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = ''
#    copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = ''
#    validate_link_id(MessageToDict(
#            context_client.AddLink(Link(**copy_link)),
#            including_default_value_fields=True, preserving_proto_field_name=True,
#            use_integers_for_enums=False))
#
#def test_get_topology_completed_2(context_client : ContextClient):
#    # should work
#    topology = MessageToDict(
#        context_client.GetTopology(Empty()),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False)
#    validate_topology(topology)
#    validate_topology_has_devices(topology)
#    validate_topology_has_links(topology)
#
#def test_get_topology_completed_rest_api(context_service_rest : Server):
#    # should work
#    request_url = 'http://127.0.0.1:{}{}/context'.format(restapi_port, RESTAPI_BASE_URL)
#    LOGGER.warning('Request: GET {}'.format(str(request_url)))
#    reply = requests.get(request_url)
#    LOGGER.warning('Reply: {}'.format(str(reply.text)))
#    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
#    json_reply = reply.json()
#    topology = MessageToDict(
#        Topology(**json_reply['topologies'][0]),
#        including_default_value_fields=True, preserving_proto_field_name=True,
#        use_integers_for_enums=False)
#    validate_topology(topology)
#    validate_topology_has_devices(topology)
#    validate_topology_has_links(topology)
#