Skip to content
Snippets Groups Projects
test_debug_api.py 10.8 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, os, pytest, time, urllib
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    get_service_port_grpc, get_service_port_http
)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
    validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
    validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
    validate_topology_ids)
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from nbi.tests.PrepareTestScenario import do_rest_get_request
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .MockService_Dependencies import MockService_Dependencies
from .Objects import (
    CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
    DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
    LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
    SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, SLICE_R1_R3, TOPOLOGY,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    TOPOLOGY_ID, POLICY_RULE, POLICY_RULE_ID, POLICY_RULE_UUID
)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed


@pytest.fixture(scope='session')
def mock_service():
    _service = MockService_Dependencies(MOCKSERVICE_PORT)
    _service.configure_env_vars()
    _service.start()
    yield _service
    _service.stop()



LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT))   # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT))   # avoid privileged ports

MOCKSERVICE_PORT = 10000
DEVICE_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports

os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)

@pytest.fixture(scope='session')
def context_service_grpc():
    _service = ContextService(context_s_mb[0], context_s_mb[1])
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
def context_service_rest():
    database = context_db_mb[0]
    _rest_server = RestServer()
    for endpoint_name, resource_class, resource_url in RESOURCES:
        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()

@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient()
    yield _client
    _client.close()

def test_populate_database():
    client = ContextClient(host=LOCAL_HOST, port=GRPC_PORT)
    client.SetContext(Context(**CONTEXT))
    client.SetTopology(Topology(**TOPOLOGY))
    client.SetDevice(Device(**DEVICE_R1))
    client.SetDevice(Device(**DEVICE_R2))
    client.SetDevice(Device(**DEVICE_R3))
    client.SetLink(Link(**LINK_R1_R2))
    client.SetLink(Link(**LINK_R1_R3))
    client.SetLink(Link(**LINK_R2_R3))
    client.SetService(Service(**SERVICE_R1_R2))
    client.SetService(Service(**SERVICE_R1_R3))
    client.SetService(Service(**SERVICE_R2_R3))
    client.SetSlice(Slice(**SLICE_R1_R3))
    client.SetConnection(Connection(**CONNECTION_R1_R3))

def test_rest_get_context_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context_ids')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_context_ids(reply)

def test_rest_get_contexts(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/contexts')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_contexts(reply)

def test_rest_get_context(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_context(reply)

def test_rest_get_topology_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/topology_ids'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_topology_ids(reply)

def test_rest_get_topologies(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/topologies'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_topologies(reply)

def test_rest_get_topology(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
    topology_uuid = urllib.parse.quote(DEFAULT_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/topology/{:s}'.format(context_uuid, topology_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_topology(reply, num_devices=3, num_links=3)

def test_rest_get_service_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/service_ids'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_service_ids(reply)

def test_rest_get_services(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/services'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_services(reply)

def test_rest_get_service(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_uuid = urllib.parse.quote(SERVICE_R1_R2_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/service/{:s}'.format(context_uuid, service_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_service(reply)

def test_rest_get_slice_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/slice_ids'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_slice_ids(reply)

def test_rest_get_slices(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/slices'.format(context_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_slices(reply)

def test_rest_get_slice(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    slice_uuid = urllib.parse.quote(SLICE_R1_R3_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/slice/{:s}'.format(context_uuid, slice_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_slice(reply)

def test_rest_get_device_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/device_ids')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_device_ids(reply)

def test_rest_get_devices(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/devices')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_devices(reply)

def test_rest_get_device(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
    device_uuid = urllib.parse.quote(DEVICE_R1_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/device/{:s}'.format(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_device(reply)

def test_rest_get_link_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/link_ids')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_link_ids(reply)

def test_rest_get_links(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/links')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_links(reply)

def test_rest_get_link(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
    link_uuid = urllib.parse.quote(LINK_R1_R2_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/link/{:s}'.format(link_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_link(reply)

def test_rest_get_connection_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_uuid = urllib.parse.quote(SERVICE_R1_R3_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/service/{:s}/connection_ids'.format(context_uuid, service_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_connection_ids(reply)

def test_rest_get_connections(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_uuid = urllib.parse.quote(SERVICE_R1_R3_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/context/{:s}/service/{:s}/connections'.format(context_uuid, service_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_connections(reply)

def test_rest_get_connection(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
    connection_uuid = urllib.parse.quote(CONNECTION_R1_R3_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/connection/{:s}'.format(connection_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    validate_connection(reply)

def test_rest_get_policyrule_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/policyrule_ids')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_policyrule_ids(reply)

def test_rest_get_policyrules(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/policyrules')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_policyrules(reply)

def test_rest_get_policyrule(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
    policyrule_uuid = urllib.parse.quote(POLICYRULE_UUID, safe='')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    reply = do_rest_get_request('/policyrule/{:s}'.format(policyrule_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #validate_policyrule(reply)