Newer
Older
import logging, os, pytest, time
from compute.Config import RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL
from compute.service.rest_server.RestServer import RestServer
from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from service.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server
from .mock_osm.MockOSM import MockOSM
from .MockService import MockService
from .MockServicerImpl_Context import MockServicerImpl_Context
from .MockServicerImpl_Service import MockServicerImpl_Service
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOCALHOST = '127.0.0.1'
MOCKSERVER_GRPC_PORT = 10000
COMPUTE_RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
class MockService_ContextService(MockService):
# Mock Server implementing Context and Service to simplify unitary tests of Compute
def __init__(self, cls_name='MockService_Service'):
super().__init__(LOCALHOST, MOCKSERVER_GRPC_PORT, cls_name=cls_name)
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
self.service_servicer = MockServicerImpl_Service()
add_ServiceServiceServicer_to_server(self.service_servicer, self.server)
os.environ['CONTEXTSERVICE_SERVICE_HOST'] = LOCALHOST
os.environ['CONTEXTSERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT)
os.environ['SERVICESERVICE_SERVICE_HOST'] = LOCALHOST
os.environ['SERVICESERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT)
# NBI Plugin IETF L2VPN requires environment variables CONTEXTSERVICE_SERVICE_HOST, CONTEXTSERVICE_SERVICE_PORT_GRPC,
# SERVICESERVICE_SERVICE_HOST, and SERVICESERVICE_SERVICE_PORT_GRPC to work properly.
# pylint: disable=wrong-import-position,ungrouped-imports
from compute.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
def mockservice():
_service = MockService_ContextService()
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def compute_service_rest(mockservice): # pylint: disable=redefined-outer-name
_rest_server = RestServer(port=COMPUTE_RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
register_ietf_l2vpn(_rest_server)
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
def osm_wim(compute_service_rest): # pylint: disable=redefined-outer-name
wim_url = 'http://{:s}:{:d}'.format(LOCALHOST, COMPUTE_RESTAPI_PORT)
return MockOSM(wim_url)
def test_compute_create_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
osm_wim.create_connectivity_service()
def test_compute_get_connectivity_service_status_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
osm_wim.get_connectivity_service_status()
def test_compute_edit_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
osm_wim.edit_connectivity_service()
def test_compute_delete_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
osm_wim.delete_connectivity_service()