Skip to content
Snippets Groups Projects
test_unitary.py 3.27 KiB
Newer Older
import logging, os, pytest, time
from compute.Config import RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL
from compute.service.rest_server.RestServer import RestServer
from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from service.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server
from .mock_osm.MockOSM import MockOSM
from .MockService import MockService
from .MockServicerImpl_Context import MockServicerImpl_Context
from .MockServicerImpl_Service import MockServicerImpl_Service

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

LOCALHOST = '127.0.0.1'
MOCKSERVER_GRPC_PORT = 10000
COMPUTE_RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports

class MockService_ContextService(MockService):
    # Mock Server implementing Context and Service to simplify unitary tests of Compute

    def __init__(self, cls_name='MockService_Service'):
        super().__init__(LOCALHOST, MOCKSERVER_GRPC_PORT, cls_name=cls_name)

    # pylint: disable=attribute-defined-outside-init
    def install_servicers(self):
        self.context_servicer = MockServicerImpl_Context()
        add_ContextServiceServicer_to_server(self.context_servicer, self.server)
        self.service_servicer = MockServicerImpl_Service()
        add_ServiceServiceServicer_to_server(self.service_servicer, self.server)

os.environ['CONTEXTSERVICE_SERVICE_HOST'] = LOCALHOST
os.environ['CONTEXTSERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT)
os.environ['SERVICESERVICE_SERVICE_HOST'] = LOCALHOST
os.environ['SERVICESERVICE_SERVICE_PORT_GRPC'] = str(MOCKSERVER_GRPC_PORT)

# NBI Plugin IETF L2VPN requires environment variables CONTEXTSERVICE_SERVICE_HOST, CONTEXTSERVICE_SERVICE_PORT_GRPC,
# SERVICESERVICE_SERVICE_HOST, and SERVICESERVICE_SERVICE_PORT_GRPC to work properly.
# pylint: disable=wrong-import-position,ungrouped-imports
from compute.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def mockservice():
    _service = MockService_ContextService()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def compute_service_rest(mockservice):  # pylint: disable=redefined-outer-name
    _rest_server = RestServer(port=COMPUTE_RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
    register_ietf_l2vpn(_rest_server)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()

@pytest.fixture(scope='session')
def osm_wim(compute_service_rest): # pylint: disable=redefined-outer-name
    wim_url = 'http://{:s}:{:d}'.format(LOCALHOST, COMPUTE_RESTAPI_PORT)
    return MockOSM(wim_url)

def test_compute_create_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
    osm_wim.create_connectivity_service()

def test_compute_get_connectivity_service_status_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
    osm_wim.get_connectivity_service_status()
def test_compute_edit_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
    osm_wim.edit_connectivity_service()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def test_compute_delete_connectivity_service_rest_api(osm_wim : MockOSM): # pylint: disable=redefined-outer-name
    osm_wim.delete_connectivity_service()