import logging, os, pytest, time from compute.Config import RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL from compute.service.rest_server.RestServer import RestServer from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from service.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server from .mock_osm.MockOSM import MockOSM from .MockService import MockService from .MockServicerImpl_Context import MockServicerImpl_Context from .MockServicerImpl_Service import MockServicerImpl_Service LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOCALHOST = '127.0.0.1' MOCKSERVER_GRPC_PORT = 10000 COMPUTE_RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports class MockService_ContextService(MockService): # Mock Server implementing Context and Service to simplify unitary tests of Compute def __init__(self, cls_name='MockService_Service'): super().__init__(LOCALHOST, MOCKSERVER_GRPC_PORT, cls_name=cls_name) # pylint: disable=attribute-defined-outside-init def install_servicers(self): self.context_servicer = MockServicerImpl_Context() add_ContextServiceServicer_to_server(self.context_servicer, self.server) self.service_servicer = MockServicerImpl_Service() add_ServiceServiceServicer_to_server(self.service_servicer, self.server) os.environ['CONTEXTSERVICE_SERVICE_HOST'] = LOCALHOST os.environ['CONTEXTSERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT) os.environ['SERVICESERVICE_SERVICE_HOST'] = LOCALHOST os.environ['SERVICESERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT) # NBI Plugin IETF L2VPN requires environment variables CONTEXTSERVICE_SERVICE_HOST, CONTEXTSERVICE_SERVICE_PORT_GRPC, # SERVICESERVICE_SERVICE_HOST, and SERVICESERVICE_SERVICE_PORT_GRPC to work properly. # pylint: disable=wrong-import-position,ungrouped-imports from compute.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn @pytest.fixture(scope='session') def mockservice(): _service = MockService_ContextService() _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def compute_service_rest(mockservice): # pylint: disable=redefined-outer-name _rest_server = RestServer(port=COMPUTE_RESTAPI_PORT, base_url=RESTAPI_BASE_URL) register_ietf_l2vpn(_rest_server) _rest_server.start() time.sleep(1) # bring time for the server to start yield _rest_server _rest_server.shutdown() _rest_server.join() @pytest.fixture(scope='session') def osm_wim(compute_service_rest): # pylint: disable=redefined-outer-name wim_url = 'http://{:s}:{:d}'.format(LOCALHOST, COMPUTE_RESTAPI_PORT) return MockOSM(wim_url) def test_compute_create_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name osm_wim.create_connectivity_service() def test_compute_get_connectivity_service_status_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name osm_wim.get_connectivity_service_status() def test_compute_edit_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name osm_wim.edit_connectivity_service() def test_compute_delete_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name osm_wim.delete_connectivity_service()