import copy, grpc, logging, pytest, requests, time from google.protobuf.json_format import MessageToDict from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.tests.Assertions import validate_empty, validate_link_id, validate_topology, validate_topology_has_devices,\ validate_topology_has_links, validate_topology_is_empty from context.client.ContextClient import ContextClient from context.proto.context_pb2 import Empty, Link, LinkId, Topology from context.service.database.Database import Database from context.service.database._engine.Factory import get_database_backend, BackendEnum from context.service.grpc_server.ContextService import ContextService from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, \ RESTAPI_BASE_URL from context.service.rest_server.Server import Server from context.service.rest_server.resources.Context import Context from .populate_database import populate_example grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports restapi_port = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) # use "copy.deepcopy" to prevent propagating forced changes during tests CONTEXT_ID = {'contextUuid': {'uuid': DEFAULT_CONTEXT_UUID}} TOPOLOGY_ID = {'contextId': copy.deepcopy(CONTEXT_ID), 'topoId': {'uuid': DEFAULT_TOPOLOGY_UUID}} LINK_ID = {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}} LINK = { 'link_id': {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}}, 'endpointList' : [ {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV1'}}, 'port_id': {'uuid' : 'EP2'}}, {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV2'}}, 'port_id': {'uuid' : 'EP1'}}, ] } @pytest.fixture(scope='session') def context_database(): database_backend = get_database_backend(engine=BackendEnum.INMEMORY) _database = Database(database_backend) return _database @pytest.fixture(scope='session') def context_service(context_database : Database): _service = ContextService( context_database, port=grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_service_rest(context_database : Database): _rest_server = Server(port=restapi_port, base_url=RESTAPI_BASE_URL) _rest_server.add_resource(Context, '/context', endpoint='api.context', resource_class_args=(context_database,)) _rest_server.start() time.sleep(1) # bring time for the server to start yield _rest_server _rest_server.shutdown() _rest_server.join() @pytest.fixture(scope='session') def context_client(context_service): _client = ContextClient(address='127.0.0.1', port=grpc_port) yield _client _client.close() def test_get_topology_empty(context_client : ContextClient, context_database : Database): # should work context_database.clear_all() validate_topology_is_empty(MessageToDict( context_client.GetTopology(Empty()), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_get_topology_completed(context_client : ContextClient, context_database : Database): # should work populate_example(context_database, add_services=False) topology = MessageToDict( context_client.GetTopology(Empty()), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False) validate_topology(topology) validate_topology_has_devices(topology) validate_topology_has_links(topology) def test_delete_link_empty_uuid(context_client : ContextClient): # should fail with link not found with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link_id = copy.deepcopy(LINK_ID) copy_link_id['link_id']['uuid'] = '' context_client.DeleteLink(LinkId(**copy_link_id)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'link_id.link_id.uuid() is out of range: '\ 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' assert e.value.details() == msg def test_add_link_already_exists(context_client : ContextClient): # should fail with link already exists with pytest.raises(grpc._channel._InactiveRpcError) as e: context_client.AddLink(Link(**LINK)) assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) already exists in the database.' assert e.value.details() == msg def test_delete_link(context_client : ContextClient): # should work validate_empty(MessageToDict( context_client.DeleteLink(LinkId(**LINK_ID)), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_delete_link_not_existing(context_client : ContextClient): # should fail with link not found with pytest.raises(grpc._channel._InactiveRpcError) as e: context_client.DeleteLink(LinkId(**LINK_ID)) assert e.value.code() == grpc.StatusCode.NOT_FOUND msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' assert e.value.details() == msg def test_add_link_uuid_empty(context_client : ContextClient): # should fail with link uuid empty with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['link_id']['link_id']['uuid'] = '' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'link.link_id.link_id.uuid() is out of range: '\ 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' assert e.value.details() == msg def test_add_link_wrong_endpoint(context_client : ContextClient): # should fail with wrong endpoint context with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'Context(wrong-context) in Endpoint(#0) of '\ 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Contexts({\'admin\'}). '\ 'Optionally, leave field empty to use predefined Context(admin).' assert e.value.details() == msg # should fail with wrong endpoint topology with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = 'wrong-topo' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\ 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Topologies({\'admin\'}). '\ 'Optionally, leave field empty to use predefined Topology(admin).' assert e.value.details() == msg # should fail with device uuid is empty with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = '' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'endpoint_id[#0].dev_id.device_id.uuid() is out of range: '\ 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' assert e.value.details() == msg # should fail with wrong endpoint device with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = 'wrong-device' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.NOT_FOUND msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\ 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' assert e.value.details() == msg # should fail with endpoint uuid is empty with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['port_id']['uuid'] = '' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\ 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' assert e.value.details() == msg # should fail with wrong endpoint port with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['port_id']['uuid'] = 'wrong-port' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.NOT_FOUND msg = 'Context(admin)/Topology(admin)/Device(DEV1)/Port(wrong-port) in Endpoint(#0) of '\ 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' assert e.value.details() == msg # should fail with endpoint device duplicated with pytest.raises(grpc._channel._InactiveRpcError) as e: copy_link = copy.deepcopy(LINK) copy_link['endpointList'][1]['dev_id']['device_id']['uuid'] = 'DEV1' context_client.AddLink(Link(**copy_link)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1) in Endpoint(#1) of '\ 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1).' assert e.value.details() == msg def test_add_link(context_client : ContextClient): # should work validate_link_id(MessageToDict( context_client.AddLink(Link(**LINK)), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_delete_link_2(context_client : ContextClient): # should work validate_empty(MessageToDict( context_client.DeleteLink(LinkId(**LINK_ID)), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_add_link_default_endpoint_context_topology(context_client : ContextClient): # should work copy_link = copy.deepcopy(LINK) copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = '' copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = '' validate_link_id(MessageToDict( context_client.AddLink(Link(**copy_link)), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_get_topology_completed_2(context_client : ContextClient): # should work topology = MessageToDict( context_client.GetTopology(Empty()), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False) validate_topology(topology) validate_topology_has_devices(topology) validate_topology_has_links(topology) def test_get_topology_completed_rest_api(context_service_rest : Server): # should work request_url = 'http://127.0.0.1:{}{}/context'.format(restapi_port, RESTAPI_BASE_URL) LOGGER.warning('Request: GET {}'.format(str(request_url))) reply = requests.get(request_url) LOGGER.warning('Reply: {}'.format(str(reply.text))) assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code) json_reply = reply.json() topology = MessageToDict( Topology(**json_reply['topologies'][0]), including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False) validate_topology(topology) validate_topology_has_devices(topology) validate_topology_has_links(topology)