Skip to content
Snippets Groups Projects
Commit ecb0e423 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch...

Merge branch 'feat/67-cttc-implement-unitary-tests-in-nbi-for-etsi-mec-bandwidth-management-api-plugin' into 'develop'

Resolve "(CTTC) Implement unitary tests in NBI for ETSI MEC Bandwidth Management API plugin"

Closes #67

See merge request !179
parents 8c02bbd6 c7e957fb
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!179Resolve "(CTTC) Implement unitary tests in NBI for ETSI MEC Bandwidth Management API plugin"
......@@ -22,8 +22,14 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_etsi_bwm.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_ietf_l2vpn.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_ietf_l3vpn.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_ietf_network.py
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_etsi_bwm.py
......@@ -69,6 +69,7 @@ unit_test nbi:
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l2vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l2vpn.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_network.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_network.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l3vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l3vpn.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_etsi_bwm.py --junitxml=/opt/results/${IMAGE_NAME}_report_etsi_bwm.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
......
......@@ -13,6 +13,7 @@
# limitations under the License.
deepdiff==6.7.*
deepmerge==1.1.*
Flask==2.1.3
Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9
......
......@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import copy, deepmerge, json, logging
from common.Constants import DEFAULT_CONTEXT_NAME
from flask_restful import Resource, request
from context.client.ContextClient import ContextClient
from flask_restful import Resource, request
from service.client.ServiceClient import ServiceClient
from .Tools import (
format_grpc_to_json, grpc_context_id, grpc_service_id, bwInfo_2_service, service_2_bwInfo)
LOGGER = logging.getLogger(__name__)
class _Resource(Resource):
......@@ -39,7 +40,6 @@ class BwInfo(_Resource):
bwinfo = request.get_json()
service = bwInfo_2_service(self.client, bwinfo)
stripped_service = copy.deepcopy(service)
stripped_service.ClearField('service_endpoint_ids')
stripped_service.ClearField('service_constraints')
stripped_service.ClearField('service_config')
......@@ -57,19 +57,30 @@ class BwInfoId(_Resource):
return service_2_bwInfo(service)
def put(self, allocationId: str):
json_data = request.get_json()
json_data = json.loads(request.get_json())
service = bwInfo_2_service(self.client, json_data)
response = self.service_client.UpdateService(service)
return format_grpc_to_json(response)
self.service_client.UpdateService(service)
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, json_data['appInsId']))
response_bwm = service_2_bwInfo(service)
return response_bwm
def patch(self, allocationId: str):
json_data = request.get_json()
if not 'appInsId' in json_data:
json_data['appInsId'] = allocationId
service = bwInfo_2_service(self.client, json_data)
response = self.service_client.UpdateService(service)
return format_grpc_to_json(response)
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, json_data['appInsId']))
current_bwm = service_2_bwInfo(service)
new_bmw = deepmerge.always_merger.merge(current_bwm, json_data)
service = bwInfo_2_service(self.client, new_bmw)
self.service_client.UpdateService(service)
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, json_data['appInsId']))
response_bwm = service_2_bwInfo(service)
return response_bwm
def delete(self, allocationId: str):
self.service_client.DeleteService(grpc_service_id(DEFAULT_CONTEXT_NAME, allocationId))
return
......@@ -15,8 +15,11 @@
import json
import logging
import time
from decimal import ROUND_HALF_EVEN, Decimal
from flask.json import jsonify
from common.proto.context_pb2 import ContextId, Empty, EndPointId, ServiceId, ServiceTypeEnum, Service, Constraint, Constraint_SLA_Capacity, ConfigRule, ConfigRule_Custom, ConfigActionEnum
from common.proto.context_pb2 import (
ContextId, Empty, EndPointId, ServiceId, ServiceTypeEnum, Service, Constraint, Constraint_SLA_Capacity,
ConfigRule, ConfigRule_Custom, ConfigActionEnum)
from common.tools.grpc.Tools import grpc_message_to_json
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Service import json_service_id
......@@ -27,13 +30,15 @@ LOGGER = logging.getLogger(__name__)
def service_2_bwInfo(service: Service) -> dict:
response = {}
# allocationDirection = '??' # String: 00 = Downlink (towards the UE); 01 = Uplink (towards the application/session); 10 = Symmetrical
response['appInsId'] = service.service_id.context_id.context_uuid.uuid # String: Application instance identifier
response['appInsId'] = service.service_id.service_uuid.uuid # String: Application instance identifier
for constraint in service.service_constraints:
if constraint.WhichOneof('constraint') == 'sla_capacity':
response['fixedAllocation'] = str(constraint.sla_capacity.capacity_gbps*1000) # String: Size of requested fixed BW allocation in [bps]
# String: Size of requested fixed BW allocation in [bps]
fixed_allocation = Decimal(constraint.sla_capacity.capacity_gbps * 1.e9)
fixed_allocation = fixed_allocation.quantize(Decimal('0.1'), rounding=ROUND_HALF_EVEN)
response['fixedAllocation'] = str(fixed_allocation)
break
for config_rule in service.service_config.config_rules:
for key in ['allocationDirection', 'fixedBWPriority', 'requestType', 'sourceIp', 'sourcePort', 'dstPort', 'protocol', 'sessionFilter']:
if config_rule.custom.resource_key == key:
......@@ -42,7 +47,6 @@ def service_2_bwInfo(service: Service) -> dict:
else:
response[key] = json.loads(config_rule.custom.resource_value)
unixtime = time.time()
response['timeStamp'] = { # Time stamp to indicate when the corresponding information elements are sent
"seconds": int(unixtime),
......@@ -53,7 +57,6 @@ def service_2_bwInfo(service: Service) -> dict:
def bwInfo_2_service(client, bwInfo: dict) -> Service:
service = Service()
for key in ['allocationDirection', 'fixedBWPriority', 'requestType', 'timeStamp', 'sessionFilter']:
if key not in bwInfo:
continue
......@@ -82,10 +85,6 @@ def bwInfo_2_service(client, bwInfo: dict) -> Service:
ep_id.endpoint_uuid.uuid = ep['uuid']
ep_id.device_id.device_uuid.uuid = device.device_id.device_uuid.uuid
service.service_endpoint_ids.append(ep_id)
if len(service.service_endpoint_ids) < 2:
LOGGER.error('No endpoints matched')
return None
service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM
......@@ -96,7 +95,7 @@ def bwInfo_2_service(client, bwInfo: dict) -> Service:
if 'fixedAllocation' in bwInfo:
capacity = Constraint_SLA_Capacity()
capacity.capacity_gbps = float(bwInfo['fixedAllocation'])
capacity.capacity_gbps = float(bwInfo['fixedAllocation']) / 1.e9
constraint = Constraint()
constraint.sla_capacity.CopyFrom(capacity)
service.service_constraints.append(constraint)
......
......@@ -15,7 +15,7 @@
from nbi.service.rest_server.RestServer import RestServer
from .Resources import BwInfo, BwInfoId
URL_PREFIX = '/bwm/v1'
URL_PREFIX = '/restconf/bwm/v1'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type.
RESOURCES = [
......
......@@ -22,6 +22,7 @@ from common.Settings import (
from context.client.ContextClient import ContextClient
from nbi.service.rest_server.RestServer import RestServer
from nbi.service.rest_server.nbi_plugins.debug_api import register_debug_api
from nbi.service.rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
from nbi.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
from nbi.service.rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn
from nbi.service.rest_server.nbi_plugins.ietf_network import register_ietf_network
......@@ -49,6 +50,7 @@ def mock_service():
def nbi_service_rest(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument
_rest_server = RestServer()
register_debug_api(_rest_server)
register_etsi_bwm_api(_rest_server)
register_ietf_l2vpn(_rest_server)
register_ietf_l3vpn(_rest_server)
register_ietf_network(_rest_server)
......@@ -85,6 +87,7 @@ class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
PATCH = 'patch'
DELETE = 'delete'
EXPECTED_STATUS_CODES : Set[int] = {
......@@ -119,34 +122,47 @@ def do_rest_get_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.GET, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_post_request(url : str, body : Optional[Any] = None, timeout : int = 10,
def do_rest_post_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.POST, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_put_request(url : str, body : Optional[Any] = None, timeout : int = 10,
def do_rest_put_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.PUT, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_delete_request(url : str, body : Optional[Any] = None, timeout : int = 10,
def do_rest_patch_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.PATCH, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_delete_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.DELETE, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import deepdiff, json, logging, pytest
from typing import Dict
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId, TopologyId
from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from nbi.service.rest_server import RestServer
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
do_rest_delete_request, do_rest_get_request, do_rest_patch_request, do_rest_post_request, do_rest_put_request,
mock_service, nbi_service_rest, context_client
)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DESCRIPTOR_FILE = 'nbi/tests/data/topology-dummy.json'
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
BASE_URL = '/restconf/bwm/v1'
@pytest.fixture(scope='session')
def storage() -> Dict:
yield dict()
#def compare_dicts(dict1, dict2):
# # Function to recursively sort dictionaries
# def recursively_sort(d):
# if isinstance(d, dict):
# return {k: recursively_sort(v) for k, v in sorted(d.items())}
# if isinstance(d, list):
# return [recursively_sort(item) for item in d]
# return d
#
# # Sort dictionaries to ignore the order of fields
# sorted_dict1 = recursively_sort(dict1)
# sorted_dict2 = recursively_sort(dict2)
#
# if sorted_dict1 != sorted_dict2:
# LOGGER.error(sorted_dict1)
# LOGGER.error(sorted_dict2)
#
# return sorted_dict1 != sorted_dict2
def check_timestamps(bwm_service):
assert 'timeStamp' in bwm_service
assert 'seconds' in bwm_service['timeStamp']
assert 'nanoseconds' in bwm_service['timeStamp']
def test_prepare_environment(context_client : ContextClient) -> None: # pylint: disable=redefined-outer-name
validate_empty_scenario(context_client)
descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTOR_FILE, context_client=context_client)
results = descriptor_loader.process()
check_descriptor_load_results(results, descriptor_loader)
descriptor_loader.validate()
# Verify the scenario has no services/slices
response = context_client.GetContext(ADMIN_CONTEXT_ID)
assert len(response.topology_ids) == 1
assert len(response.service_ids ) == 0
assert len(response.slice_ids ) == 0
def test_get_allocations_empty(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 0
def test_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
URL = BASE_URL + '/bw_allocations'
data = {
"appInsId" : "service_uuid_01",
"allocationDirection" : "00",
"fixedAllocation" : "123000.0",
"fixedBWPriority" : "SEE_DESCRIPTION",
"requestType" : 0,
"sessionFilter" : [{
"sourceIp" : "192.168.1.2",
"sourcePort" : ["a"],
"protocol" : "string",
"dstAddress" : "192.168.3.2",
"dstPort" : ["b"],
}]
}
retrieved_data = do_rest_post_request(URL, body=data, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
storage['service_uuid_01'] = 'service_uuid_01'
def test_get_allocations(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 1
good_result = [
{
"appInsId" : "service_uuid_01",
"fixedAllocation" : "123000.0",
"allocationDirection" : "00",
"fixedBWPriority" : "SEE_DESCRIPTION",
"requestType" : "0",
"sessionFilter" : [{
"sourceIp" : "192.168.1.2",
"sourcePort" : ["a"],
"protocol" : "string",
"dstAddress" : "192.168.3.2",
"dstPort" : ["b"],
}],
}
]
check_timestamps(retrieved_data[0])
del retrieved_data[0]['timeStamp']
diff_data = deepdiff.DeepDiff(good_result, retrieved_data)
LOGGER.error('Differences:\n{:s}'.format(str(diff_data.pretty())))
assert len(diff_data) == 0
def test_get_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
good_result = {
"appInsId" : "service_uuid_01",
"fixedAllocation" : "123000.0",
"allocationDirection": "00",
"fixedBWPriority" : "SEE_DESCRIPTION",
"requestType" : "0",
"sessionFilter" : [{
"sourceIp" : "192.168.1.2",
"sourcePort" : ["a"],
"protocol" : "string",
"dstAddress" : "192.168.3.2",
"dstPort" : ["b"],
}]
}
check_timestamps(retrieved_data)
del retrieved_data['timeStamp']
diff_data = deepdiff.DeepDiff(good_result, retrieved_data)
LOGGER.error('Differences:\n{:s}'.format(str(diff_data.pretty())))
assert len(diff_data) == 0
def test_put_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
changed_allocation = {
"appInsId" : "service_uuid_01",
"fixedAllocation" : "200.0",
"allocationDirection": "00",
"fixedBWPriority" : "NOPRIORITY",
"requestType" : "0",
"sessionFilter" : [{
"sourceIp" : "192.168.1.2",
"sourcePort" : ["a"],
"protocol" : "string",
"dstAddress" : "192.168.3.2",
"dstPort" : ["b"],
}]
}
retrieved_data = do_rest_put_request(URL, body=json.dumps(changed_allocation), logger=LOGGER, expected_status_codes={200})
check_timestamps(retrieved_data)
del retrieved_data['timeStamp']
diff_data = deepdiff.DeepDiff(changed_allocation, retrieved_data)
LOGGER.error('Differences:\n{:s}'.format(str(diff_data.pretty())))
assert len(diff_data) == 0
def test_patch_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
difference = {
"fixedBWPriority":"FULLPRIORITY",
}
changed_allocation = {
"appInsId" : "service_uuid_01",
"fixedAllocation" : "200.0",
"allocationDirection": "00",
"fixedBWPriority" : "FULLPRIORITY",
"requestType" : "0",
"sessionFilter" : [{
"sourceIp" : "192.168.1.2",
"sourcePort" : ["a"],
"protocol" : "string",
"dstAddress" : "192.168.3.2",
"dstPort" : ["b"],
}]
}
retrieved_data = do_rest_patch_request(URL, body=difference, logger=LOGGER, expected_status_codes={200})
check_timestamps(retrieved_data)
del retrieved_data['timeStamp']
diff_data = deepdiff.DeepDiff(changed_allocation, retrieved_data)
LOGGER.error('Differences:\n{:s}'.format(str(diff_data.pretty())))
assert len(diff_data) == 0
def test_delete_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
do_rest_delete_request(URL, logger=LOGGER, expected_status_codes={200})
def test_get_allocations_empty_final(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 0
def test_cleanup_environment(context_client : ContextClient) -> None: # pylint: disable=redefined-outer-name
# Verify the scenario has no services/slices
response = context_client.GetContext(ADMIN_CONTEXT_ID)
assert len(response.topology_ids) == 1
assert len(response.service_ids ) == 0
assert len(response.slice_ids ) == 0
# Load descriptors and validate the base scenario
descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTOR_FILE, context_client=context_client)
descriptor_loader.validate()
descriptor_loader.unload()
validate_empty_scenario(context_client)
......@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
import deepdiff, json, logging, operator
from typing import Dict
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import ContextId
from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment