Skip to content
Snippets Groups Projects
Commit 2d918155 authored by Carlos Manso's avatar Carlos Manso
Browse files

etsi_bwm fixes and unit tests

parent 1cdafbad
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!179Resolve "(CTTC) Implement unitary tests in NBI for ETSI MEC Bandwidth Management API plugin"
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_etsi_bwm.py
......@@ -494,7 +494,7 @@ class MockServicerImpl_Context(ContextServiceServicer):
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.debug('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
context_uuid = str(request.service_id.context_id.context_uuid.uuid)
context_uuid = str(request.context_id.context_uuid.uuid)
container_name = 'service[{:s}]'.format(context_uuid)
service_uuid = request.service_uuid.uuid
reply = self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context)
......
......@@ -56,7 +56,7 @@ unit_test nbi:
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py $IMAGE_NAME/tests/test_etsi_bwm.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
......
......@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import copy, json, logging
from common.Constants import DEFAULT_CONTEXT_NAME
from flask_restful import Resource, request
from context.client.ContextClient import ContextClient
from flask_restful import Resource, request
from service.client.ServiceClient import ServiceClient
from .Tools import (
format_grpc_to_json, grpc_context_id, grpc_service_id, bwInfo_2_service, service_2_bwInfo)
LOGGER = logging.getLogger(__name__)
class _Resource(Resource):
......@@ -39,7 +40,6 @@ class BwInfo(_Resource):
bwinfo = request.get_json()
service = bwInfo_2_service(self.client, bwinfo)
stripped_service = copy.deepcopy(service)
stripped_service.ClearField('service_endpoint_ids')
stripped_service.ClearField('service_constraints')
stripped_service.ClearField('service_config')
......@@ -57,19 +57,24 @@ class BwInfoId(_Resource):
return service_2_bwInfo(service)
def put(self, allocationId: str):
json_data = request.get_json()
json_data = json.loads(request.get_json())
service = bwInfo_2_service(self.client, json_data)
response = self.service_client.UpdateService(service)
return format_grpc_to_json(response)
self.service_client.UpdateService(service)
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, json_data['appInsId']))
response_bwm = service_2_bwInfo(service)
return response_bwm
def patch(self, allocationId: str):
json_data = request.get_json()
if not 'appInsId' in json_data:
json_data['appInsId'] = allocationId
service = bwInfo_2_service(self.client, json_data)
response = self.service_client.UpdateService(service)
return format_grpc_to_json(response)
self.service_client.UpdateService(service)
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, json_data['appInsId']))
response_bwm = service_2_bwInfo(service)
return response_bwm
def delete(self, allocationId: str):
self.service_client.DeleteService(grpc_service_id(DEFAULT_CONTEXT_NAME, allocationId))
return
......@@ -27,13 +27,12 @@ LOGGER = logging.getLogger(__name__)
def service_2_bwInfo(service: Service) -> dict:
response = {}
# allocationDirection = '??' # String: 00 = Downlink (towards the UE); 01 = Uplink (towards the application/session); 10 = Symmetrical
response['appInsId'] = service.service_id.context_id.context_uuid.uuid # String: Application instance identifier
response['appInsId'] = service.service_id.service_uuid.uuid # String: Application instance identifier
for constraint in service.service_constraints:
if constraint.WhichOneof('constraint') == 'sla_capacity':
response['fixedAllocation'] = str(constraint.sla_capacity.capacity_gbps*1000) # String: Size of requested fixed BW allocation in [bps]
break
for config_rule in service.service_config.config_rules:
for key in ['allocationDirection', 'fixedBWPriority', 'requestType', 'sourceIp', 'sourcePort', 'dstPort', 'protocol', 'sessionFilter']:
if config_rule.custom.resource_key == key:
......@@ -42,7 +41,6 @@ def service_2_bwInfo(service: Service) -> dict:
else:
response[key] = json.loads(config_rule.custom.resource_value)
unixtime = time.time()
response['timeStamp'] = { # Time stamp to indicate when the corresponding information elements are sent
"seconds": int(unixtime),
......@@ -53,7 +51,6 @@ def service_2_bwInfo(service: Service) -> dict:
def bwInfo_2_service(client, bwInfo: dict) -> Service:
service = Service()
for key in ['allocationDirection', 'fixedBWPriority', 'requestType', 'timeStamp', 'sessionFilter']:
if key not in bwInfo:
continue
......@@ -82,10 +79,6 @@ def bwInfo_2_service(client, bwInfo: dict) -> Service:
ep_id.endpoint_uuid.uuid = ep['uuid']
ep_id.device_id.device_uuid.uuid = device.device_id.device_uuid.uuid
service.service_endpoint_ids.append(ep_id)
if len(service.service_endpoint_ids) < 2:
LOGGER.error('No endpoints matched')
return None
service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM
......
......@@ -15,7 +15,7 @@
from nbi.service.rest_server.RestServer import RestServer
from .Resources import BwInfo, BwInfoId
URL_PREFIX = '/bwm/v1'
URL_PREFIX = '/restconf/bwm/v1'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type.
RESOURCES = [
......
......@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
WIM_USERNAME = 'admin'
WIM_PASSWORD = 'admin'
USERNAME = 'admin'
PASSWORD = 'admin'
# Ref: https://osm.etsi.org/wikipub/index.php/WIM
WIM_MAPPING = [
......
......@@ -12,16 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os, pytest, time
import logging, os, pytest, requests, time, enum
from typing import Dict, List, Optional, Set, Union, Any
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_port_http)
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_HTTP,
get_env_var_name, get_service_baseurl_http, get_service_port_http
)
from context.client.ContextClient import ContextClient
from nbi.service.rest_server.RestServer import RestServer
from nbi.service.rest_server.nbi_plugins.debug_api import register_debug_api
from nbi.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
from nbi.service.rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
from nbi.tests.MockService_Dependencies import MockService_Dependencies
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from tests.tools.mock_osm.MockOSM import MockOSM
from .Constants import WIM_MAPPING, WIM_USERNAME, WIM_PASSWORD
from .Constants import USERNAME, PASSWORD, WIM_MAPPING
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
......@@ -38,10 +45,11 @@ def mock_service():
_service.stop()
@pytest.fixture(scope='session')
def nbi_service_rest(mock_service): # pylint: disable=redefined-outer-name
def nbi_service_rest(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument
_rest_server = RestServer()
register_debug_api(_rest_server)
register_ietf_l2vpn(_rest_server)
register_etsi_bwm_api(_rest_server)
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
......@@ -49,6 +57,105 @@ def nbi_service_rest(mock_service): # pylint: disable=redefined-outer-name
_rest_server.join()
@pytest.fixture(scope='session')
def osm_wim(nbi_service_rest): # pylint: disable=redefined-outer-name
def osm_wim(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
wim_url = 'http://{:s}:{:d}'.format(LOCAL_HOST, NBI_SERVICE_PORT)
return MockOSM(wim_url, WIM_MAPPING, WIM_USERNAME, WIM_PASSWORD)
return MockOSM(wim_url, WIM_MAPPING, USERNAME, PASSWORD)
@pytest.fixture(scope='session')
def context_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument
_client = ContextClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def service_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument
_client = ServiceClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def slice_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument
_client = SliceClient()
yield _client
_client.close()
class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
DELETE = 'delete'
PATCH = 'patch'
EXPECTED_STATUS_CODES : Set[int] = {
requests.codes['OK' ],
requests.codes['CREATED' ],
requests.codes['ACCEPTED' ],
requests.codes['NO_CONTENT'],
}
def do_rest_request(
method : RestRequestMethod, url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
base_url = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
request_url = 'http://{:s}:{:s}@{:s}:{:d}{:s}{:s}'.format(
USERNAME, PASSWORD, LOCAL_HOST, NBI_SERVICE_PORT, str(base_url), url
)
if logger is not None:
msg = 'Request: {:s} {:s}'.format(str(method.value).upper(), str(request_url))
if body is not None: msg += ' body={:s}'.format(str(body))
logger.warning(msg)
method = getattr(requests, method.value)
reply = method(request_url, timeout=timeout, json=body, allow_redirects=allow_redirects)
if logger is not None:
logger.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code in expected_status_codes, 'Reply failed with status code {:d}'.format(reply.status_code)
return reply.json()
def do_rest_get_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
return do_rest_request(
RestRequestMethod.GET, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_post_request(url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
return do_rest_request(
RestRequestMethod.POST, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_put_request(url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
return do_rest_request(
RestRequestMethod.PUT, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_delete_request(url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
return do_rest_request(
RestRequestMethod.DELETE, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_patch_request(url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Union[Dict, List]:
return do_rest_request(
RestRequestMethod.PATCH, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
\ No newline at end of file
This diff is collapsed.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
import json, logging, pytest
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId, TopologyId
from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from nbi.service.rest_server import RestServer
from .PrepareTestScenario import do_rest_delete_request, do_rest_post_request, do_rest_get_request, do_rest_put_request, do_rest_patch_request, mock_service, nbi_service_rest, context_client
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DESCRIPTOR_FILE = 'nbi/tests/data/topology-dummy.json'
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
BASE_URL = '/restconf/bwm/v1'
@pytest.fixture(scope='session')
def storage() -> Dict:
yield dict()
def compare_dicts(dict1, dict2):
# Function to recursively sort dictionaries
def recursively_sort(d):
if isinstance(d, dict):
return {k: recursively_sort(v) for k, v in sorted(d.items())}
if isinstance(d, list):
return [recursively_sort(item) for item in d]
return d
# Sort dictionaries to ignore the order of fields
sorted_dict1 = recursively_sort(dict1)
sorted_dict2 = recursively_sort(dict2)
if sorted_dict1 != sorted_dict2:
LOGGER.error(sorted_dict1)
LOGGER.error(sorted_dict2)
return sorted_dict1 != sorted_dict2
def check_timestamps(bwm_service):
assert 'timeStamp' in bwm_service
assert 'seconds' in bwm_service['timeStamp']
assert 'nanoseconds' in bwm_service['timeStamp']
def test_prepare_environment(context_client : ContextClient) -> None: # pylint: disable=redefined-outer-name
validate_empty_scenario(context_client)
descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTOR_FILE, context_client=context_client)
results = descriptor_loader.process()
check_descriptor_load_results(results, descriptor_loader)
descriptor_loader.validate()
# Verify the scenario has no services/slices
response = context_client.GetContext(ADMIN_CONTEXT_ID)
assert len(response.topology_ids) == 3
assert len(response.service_ids ) == 0
assert len(response.slice_ids ) == 0
def test_get_allocations_empty(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 0
def test_allocation(nbi_service_rest : RestServer, storage : Dict):
URL = BASE_URL + '/bw_allocations'
data = {
"allocationDirection":"string",
"appInsId":"service_uuid_01",
"fixedAllocation":"123000.0",
"fixedBWPriority":"SEE_DESCRIPTION",
"requestType":0,
"sessionFilter":[
{
"dstAddress":"192.168.3.2",
"dstPort":["b"],
"protocol":"string",
"sourceIp":"192.168.1.2",
"sourcePort":["a"]
}
]
}
retrieved_data = do_rest_post_request(URL, body=data, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
storage['service_uuid_01'] = 'service_uuid_01'
def test_get_allocations(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 1
good_result = [
{
"appInsId":"service_uuid_01",
"fixedAllocation":"123000.0",
"allocationDirection":"string",
"fixedBWPriority":"SEE_DESCRIPTION",
"requestType":"0",
"sessionFilter":[
{
"dstAddress":"192.168.3.2",
"dstPort":["b"],
"protocol":"string",
"sourceIp":"192.168.1.2",
"sourcePort":["a"]
}
],
}
]
compare_dicts(retrieved_data, good_result)
check_timestamps(retrieved_data[0])
def test_get_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
good_result = {
"appInsId":"service_uuid_01",
"fixedAllocation":"123000.0",
"allocationDirection":"string",
"fixedBWPriority":"SEE_DESCRIPTION",
"requestType":"0",
"sessionFilter":[
{
"dstAddress":"192.168.3.2",
"dstPort":["b"],
"protocol":"string",
"sourceIp":"192.168.1.2",
"sourcePort":["a"]
}
]
}
compare_dicts(retrieved_data, good_result)
check_timestamps(retrieved_data)
def test_put_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
changed_allocation = {
"appInsId":"service_uuid_01",
"fixedAllocation":"200.0",
"allocationDirection":"parriba",
"fixedBWPriority":"NOPRIORITY",
"requestType":"0",
"sessionFilter":[
{
"dstAddress":"192.168.3.2",
"dstPort":["b"],
"protocol":"string",
"sourceIp":"192.168.1.2",
"sourcePort":["a"]
}
]
}
retrieved_data = do_rest_put_request(URL, body=json.dumps(changed_allocation), logger=LOGGER, expected_status_codes={200})
compare_dicts(retrieved_data, changed_allocation)
check_timestamps(retrieved_data)
def test_patch_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
difference = {
"fixedBWPriority":"FULLPRIORITY",
}
changed_allocation = {
"appInsId":"service_uuid_01",
"fixedAllocation":"200.0",
"allocationDirection":"parriba",
"fixedBWPriority":"FULLPRIORITY",
"requestType":"0",
"sessionFilter":[
{
"dstAddress":"192.168.3.2",
"dstPort":["b"],
"protocol":"string",
"sourceIp":"192.168.1.2",
"sourcePort":["a"]
}
]
}
retrieved_data = do_rest_patch_request(URL, body=changed_allocation, logger=LOGGER, expected_status_codes={200})
compare_dicts(retrieved_data, changed_allocation)
check_timestamps(retrieved_data)
def test_delete_allocation(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
assert 'service_uuid_01' in storage
URL = BASE_URL + '/bw_allocations/service_uuid_01'
do_rest_delete_request(URL, logger=LOGGER, expected_status_codes={200})
def test_get_allocations_empty_final(nbi_service_rest : RestServer, storage : Dict): # pylint: disable=redefined-outer-name, unused-argument
URL = BASE_URL + '/bw_allocations'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment