import copy, logging, pytest #import grpc, requests, time #from google.protobuf.json_format import MessageToDict from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum #from common.tests.Assertions import validate_empty, validate_link_id, validate_topology, validate_topology_has_devices,\ # validate_topology_has_links, validate_topology_is_empty from context.client.ContextClient import ContextClient from context.proto.context_pb2 import \ Context, ContextEvent, ContextId, ContextIdList, ContextList, \ Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, \ Empty, \ Link, LinkEvent, LinkId, LinkIdList, LinkList, \ Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, \ Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList from context.service.grpc_server.ContextService import ContextService from context.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD #, RESTAPI_SERVICE_PORT, \ # RESTAPI_BASE_URL #from context.service.rest_server.Server import Server #from context.service.rest_server.resources.Context import Context #from .populate_database import populate_example grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports #restapi_port = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ## use "copy.deepcopy" to prevent propagating forced changes during tests CONTEXT_ID = {'context_uuid': {'uuid': DEFAULT_CONTEXT_UUID}} CONTEXT = {'context_id': copy.deepcopy(CONTEXT_ID)} TOPOLOGY_ID = {'context_id': copy.deepcopy(CONTEXT_ID), 'topology_uuid': {'uuid': DEFAULT_TOPOLOGY_UUID}} TOPOLOGY = {'topology_id': copy.deepcopy(TOPOLOGY_ID)} #LINK_ID = {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}} #LINK = { # 'link_id': {'link_id': {'uuid': 'DEV1/EP2 ==> DEV2/EP1'}}, # 'endpointList' : [ # {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV1'}}, 'port_id': {'uuid' : 'EP2'}}, # {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': {'device_id': {'uuid': 'DEV2'}}, 'port_id': {'uuid' : 'EP1'}}, # ] #} @pytest.fixture(scope='session') def context_database(): database_backend = get_database_backend(engine=BackendEnum.INMEMORY) _database = Database(database_backend) return _database @pytest.fixture(scope='session') def context_service(context_database : Database): # pylint: disable=redefined-outer-name _service = ContextService( context_database, port=grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() #@pytest.fixture(scope='session') #def context_service_rest(context_database : Database): # _rest_server = Server(port=restapi_port, base_url=RESTAPI_BASE_URL) # _rest_server.add_resource(Context, '/context', endpoint='api.context', resource_class_args=(context_database,)) # _rest_server.start() # time.sleep(1) # bring time for the server to start # yield _rest_server # _rest_server.shutdown() # _rest_server.join() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='127.0.0.1', port=grpc_port) yield _client _client.close() def test_context_instances( context_client : ContextClient, context_database : Database): # pylint: disable=redefined-outer-name context_database.clear_all() response = context_client.ListContextIds(Empty()) assert len(response.context_ids) == 0 response = context_client.ListContexts(Empty()) assert len(response.contexts) == 0 response = context_client.GetContext(ContextId(**CONTEXT_ID)) assert len(response.context_id.context_uuid.uuid) == 0 assert len(response.topology_ids) == 0 assert len(response.service_ids) == 0 db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 response = context_client.SetContext(Context(**CONTEXT)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 2 response = context_client.GetContext(ContextId(**CONTEXT_ID)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert len(response.topology_ids) == 0 assert len(response.service_ids) == 0 response = context_client.ListContextIds(Empty()) assert len(response.context_ids) == 1 assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID response = context_client.ListContexts(Empty()) assert len(response.contexts) == 1 assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert len(response.contexts[0].topology_ids) == 0 assert len(response.contexts[0].service_ids) == 0 context_client.RemoveContext(ContextId(**CONTEXT_ID)) db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 def test_topology_instances( context_client : ContextClient, context_database : Database): # pylint: disable=redefined-outer-name context_database.clear_all() response = context_client.SetContext(Context(**CONTEXT)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID response = context_client.ListTopologyIds(ContextId(**CONTEXT_ID)) assert len(response.topology_ids) == 0 response = context_client.ListTopologies(ContextId(**CONTEXT_ID)) assert len(response.topologies) == 0 response = context_client.GetTopology(TopologyId(**TOPOLOGY_ID)) assert len(response.topology_id.topology_uuid.uuid) == 0 assert len(response.topology_id.context_id.context_uuid.uuid) == 0 assert len(response.device_ids) == 0 assert len(response.link_ids) == 0 db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 2 response = context_client.SetTopology(Topology(**TOPOLOGY)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 5 response = context_client.ListTopologyIds(ContextId(**CONTEXT_ID)) assert len(response.topology_ids) == 1 assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID response = context_client.ListTopologies(ContextId(**CONTEXT_ID)) assert len(response.topologies) == 1 assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID assert len(response.topologies[0].device_ids) == 0 assert len(response.topologies[0].link_ids) == 0 context_client.RemoveTopology(TopologyId(**TOPOLOGY_ID)) context_client.RemoveContext(ContextId(**CONTEXT_ID)) db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 #def test_get_topology_empty(context_client : ContextClient, context_database : Database): # # should work # context_database.clear_all() # validate_topology_is_empty(MessageToDict( # context_client.GetTopology(Empty()), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_get_topology_completed(context_client : ContextClient, context_database : Database): # # should work # populate_example(context_database, add_services=False) # topology = MessageToDict( # context_client.GetTopology(Empty()), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False) # validate_topology(topology) # validate_topology_has_devices(topology) # validate_topology_has_links(topology) # #def test_delete_link_empty_uuid(context_client : ContextClient): # # should fail with link not found # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link_id = copy.deepcopy(LINK_ID) # copy_link_id['link_id']['uuid'] = '' # context_client.DeleteLink(LinkId(**copy_link_id)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'link_id.link_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # #def test_add_link_already_exists(context_client : ContextClient): # # should fail with link already exists # with pytest.raises(grpc._channel._InactiveRpcError) as e: # context_client.AddLink(Link(**LINK)) # assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS # msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) already exists in the database.' # assert e.value.details() == msg # #def test_delete_link(context_client : ContextClient): # # should work # validate_empty(MessageToDict( # context_client.DeleteLink(LinkId(**LINK_ID)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_delete_link_not_existing(context_client : ContextClient): # # should fail with link not found # with pytest.raises(grpc._channel._InactiveRpcError) as e: # context_client.DeleteLink(LinkId(**LINK_ID)) # assert e.value.code() == grpc.StatusCode.NOT_FOUND # msg = 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' # assert e.value.details() == msg # #def test_add_link_uuid_empty(context_client : ContextClient): # # should fail with link uuid empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['link_id']['link_id']['uuid'] = '' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'link.link_id.link_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # #def test_add_link_wrong_endpoint(context_client : ContextClient): # # should fail with wrong endpoint context # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Context(wrong-context) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Contexts({\'admin\'}). '\ # 'Optionally, leave field empty to use predefined Context(admin).' # assert e.value.details() == msg # # # should fail with wrong endpoint topology # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = 'wrong-topo' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) mismatches acceptable Topologies({\'admin\'}). '\ # 'Optionally, leave field empty to use predefined Topology(admin).' # assert e.value.details() == msg # # # should fail with device uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = '' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'endpoint_id[#0].dev_id.device_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with wrong endpoint device # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['dev_id']['device_id']['uuid'] = 'wrong-device' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.NOT_FOUND # msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' # assert e.value.details() == msg # # # should fail with endpoint uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['port_id']['uuid'] = '' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with wrong endpoint port # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['port_id']['uuid'] = 'wrong-port' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.NOT_FOUND # msg = 'Context(admin)/Topology(admin)/Device(DEV1)/Port(wrong-port) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1) does not exist in the database.' # assert e.value.details() == msg # # # should fail with endpoint device duplicated # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][1]['dev_id']['device_id']['uuid'] = 'DEV1' # context_client.AddLink(Link(**copy_link)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1) in Endpoint(#1) of '\ # 'Context(admin)/Topology(admin)/Link(DEV1/EP2 ==> DEV2/EP1).' # assert e.value.details() == msg # #def test_add_link(context_client : ContextClient): # # should work # validate_link_id(MessageToDict( # context_client.AddLink(Link(**LINK)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_delete_link_2(context_client : ContextClient): # # should work # validate_empty(MessageToDict( # context_client.DeleteLink(LinkId(**LINK_ID)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_add_link_default_endpoint_context_topology(context_client : ContextClient): # # should work # copy_link = copy.deepcopy(LINK) # copy_link['endpointList'][0]['topoId']['contextId']['contextUuid']['uuid'] = '' # copy_link['endpointList'][0]['topoId']['topoId']['uuid'] = '' # validate_link_id(MessageToDict( # context_client.AddLink(Link(**copy_link)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_get_topology_completed_2(context_client : ContextClient): # # should work # topology = MessageToDict( # context_client.GetTopology(Empty()), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False) # validate_topology(topology) # validate_topology_has_devices(topology) # validate_topology_has_links(topology) # #def test_get_topology_completed_rest_api(context_service_rest : Server): # # should work # request_url = 'http://127.0.0.1:{}{}/context'.format(restapi_port, RESTAPI_BASE_URL) # LOGGER.warning('Request: GET {}'.format(str(request_url))) # reply = requests.get(request_url) # LOGGER.warning('Reply: {}'.format(str(reply.text))) # assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code) # json_reply = reply.json() # topology = MessageToDict( # Topology(**json_reply['topologies'][0]), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False) # validate_topology(topology) # validate_topology_has_devices(topology) # validate_topology_has_links(topology) #