Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 2172 additions and 181 deletions
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging
from typing import Dict, List, Optional, Tuple, Union
from .RestApiClient import HTTP_STATUS_CREATED, HTTP_STATUS_NO_CONTENT, HTTP_STATUS_OK, RestApiClient
LOGGER = logging.getLogger(__name__)
class BandwidthProfileTypeEnum(enum.Enum):
MEF_10_BWP = 'ietf-eth-tran-types:mef-10-bwp'
class EndpointLayerSpecificAccessTypeEnum(enum.Enum):
PORT = 'port'
class EndpointProtectionRoleEnum(enum.Enum):
WORK = 'work'
class OptimizationMetricRole(enum.Enum):
WORK = 'work'
class OptimizationMetricType(enum.Enum):
PATH_METRIC_TE = 'ietf-te-types:path-metric-te'
class OuterTagTypeEnum(enum.Enum):
CLASSIFY_C_VLAN = 'ietf-eth-tran-types:classify-c-vlan'
class ServiceClassificationTypeEnum(enum.Enum):
VLAN_CLASSIFICATION = 'ietf-eth-tran-type:vlan-classification'
class ServiceTypeEnum(enum.Enum):
MP2MP = 'op-mp2mp-svc'
P2MP = 'op-p2mp-svc'
def compose_outer_tag(tag_type : OuterTagTypeEnum, vlan_value : int) -> Dict:
return {'tag-type': tag_type.value, 'vlan-value': vlan_value}
def compose_ingress_egress_bandwidth_profile() -> Dict:
return {
'bandwidth-profile-type': BandwidthProfileTypeEnum.MEF_10_BWP.value,
'CIR': 10_000_000,
'EIR': 10_000_000,
}
def compose_layer_specific_access_type() -> Dict:
return {'access-type': EndpointLayerSpecificAccessTypeEnum.PORT.value}
def compose_static_route(prefix : str, mask : int, next_hop : str) -> Dict:
return {'destination': prefix, 'destination-mask': mask, 'next-hop': next_hop}
def compose_static_route_list(static_routes : List[Tuple[str, int, str]]) -> List[Dict]:
return [
compose_static_route(prefix, mask, next_hop)
for prefix, mask, next_hop in static_routes
]
def compose_etht_service_endpoint(
node_id : str, tp_id : str, vlan_value : int, static_routes : List[Tuple[str, int, str]] = list()
) -> Dict:
return {
'node-id' : node_id,
'tp-id' : tp_id,
'protection-role' : EndpointProtectionRoleEnum.WORK.value,
'layer-specific' : compose_layer_specific_access_type(),
'is-extendable' : False,
'is-terminal' : True,
'static-route-list' : compose_static_route_list(static_routes),
'outer-tag' : compose_outer_tag(OuterTagTypeEnum.CLASSIFY_C_VLAN, vlan_value),
'service-classification-type' : ServiceClassificationTypeEnum.VLAN_CLASSIFICATION.value,
'ingress-egress-bandwidth-profile': compose_ingress_egress_bandwidth_profile(),
}
def compose_optimizations() -> Dict:
return {'optimization-metric': [{
'metric-role': OptimizationMetricRole.WORK.value,
'metric-type': OptimizationMetricType.PATH_METRIC_TE.value,
}]}
def compose_etht_service(
name : str, service_type : ServiceTypeEnum, osu_tunnel_name : str,
src_node_id : str, src_tp_id : str, src_vlan_tag : int, dst_node_id : str, dst_tp_id : str, dst_vlan_tag : int,
src_static_routes : List[Tuple[str, int, str]] = list(), dst_static_routes : List[Tuple[str, int, str]] = list()
) -> Dict:
return {'ietf-eth-tran-service:etht-svc': {'etht-svc-instances': [{
'etht-svc-name' : name,
'etht-svc-title': name.upper(),
'etht-svc-type' : service_type.value,
'source-endpoints': {'source-endpoint': [
compose_etht_service_endpoint(src_node_id, src_tp_id, src_vlan_tag, src_static_routes),
]},
'destination-endpoints': {'destination-endpoint': [
compose_etht_service_endpoint(dst_node_id, dst_tp_id, dst_vlan_tag, dst_static_routes),
]},
'svc-tunnel': [{'tunnel-name': osu_tunnel_name}],
'optimizations': compose_optimizations(),
}]}}
class EthtServiceHandler:
def __init__(self, rest_api_client : RestApiClient) -> None:
self._rest_api_client = rest_api_client
self._object_name = 'EthtService'
self._subpath_root = '/ietf-eth-tran-service:etht-svc'
self._subpath_item = self._subpath_root + '/etht-svc-instances="{etht_service_name:s}"'
def _rest_api_get(self, etht_service_name : Optional[str] = None) -> Union[Dict, List]:
if etht_service_name is None:
subpath_url = self._subpath_root
else:
subpath_url = self._subpath_item.format(etht_service_name=etht_service_name)
return self._rest_api_client.get(
self._object_name, subpath_url, expected_http_status={HTTP_STATUS_OK}
)
def _rest_api_update(self, data : Dict) -> bool:
return self._rest_api_client.update(
self._object_name, self._subpath_root, data, expected_http_status={HTTP_STATUS_CREATED}
)
def _rest_api_delete(self, etht_service_name : str) -> bool:
if etht_service_name is None: raise Exception('etht_service_name is None')
subpath_url = self._subpath_item.format(etht_service_name=etht_service_name)
return self._rest_api_client.delete(
self._object_name, subpath_url, expected_http_status={HTTP_STATUS_NO_CONTENT}
)
def get(self, etht_service_name : Optional[str] = None) -> Union[Dict, List]:
data = self._rest_api_get(etht_service_name=etht_service_name)
if not isinstance(data, dict): raise ValueError('data should be a dict')
if 'ietf-eth-tran-service:etht-svc' not in data:
raise ValueError('data does not contain key "ietf-eth-tran-service:etht-svc"')
data = data['ietf-eth-tran-service:etht-svc']
if 'etht-svc-instances' not in data:
raise ValueError('data["ietf-eth-tran-service:etht-svc"] does not contain key "etht-svc-instances"')
data = data['etht-svc-instances']
if not isinstance(data, list):
raise ValueError('data["ietf-eth-tran-service:etht-svc"]["etht-svc-instances"] should be a list')
etht_services : List[Dict] = list()
for item in data:
src_endpoints = item['source-endpoints']['source-endpoint']
if len(src_endpoints) != 1:
MSG = 'EthtService({:s}) has zero/multiple source endpoints'
raise Exception(MSG.format(str(item)))
src_endpoint = src_endpoints[0]
dst_endpoints = item['destination-endpoints']['destination-endpoint']
if len(dst_endpoints) != 1:
MSG = 'EthtService({:s}) has zero/multiple destination endpoints'
raise Exception(MSG.format(str(item)))
dst_endpoint = dst_endpoints[0]
svc_tunnels = item['svc-tunnel']
if len(svc_tunnels) != 1:
MSG = 'EthtService({:s}) has zero/multiple service tunnels'
raise Exception(MSG.format(str(item)))
svc_tunnel = svc_tunnels[0]
etht_service = {
'name' : item['etht-svc-name'],
'service_type' : item['etht-svc-type'],
'osu_tunnel_name' : svc_tunnel['tunnel-name'],
'src_node_id' : src_endpoint['node-id'],
'src_tp_id' : src_endpoint['tp-id'],
'src_vlan_tag' : src_endpoint['outer-tag']['vlan-value'],
'src_static_routes': [
[static_route['destination'], static_route['destination-mask'], static_route['next-hop']]
for static_route in src_endpoint.get('static-route-list', list())
],
'dst_node_id' : dst_endpoint['node-id'],
'dst_tp_id' : dst_endpoint['tp-id'],
'dst_vlan_tag' : dst_endpoint['outer-tag']['vlan-value'],
'dst_static_routes': [
[static_route['destination'], static_route['destination-mask'], static_route['next-hop']]
for static_route in dst_endpoint.get('static-route-list', list())
],
}
etht_services.append(etht_service)
return etht_services
def update(self, parameters : Dict) -> bool:
name = parameters['name' ]
service_type = parameters['service_type' ]
osu_tunnel_name = parameters['osu_tunnel_name']
src_node_id = parameters['src_node_id' ]
src_tp_id = parameters['src_tp_id' ]
src_vlan_tag = parameters['src_vlan_tag' ]
src_static_routes = parameters.get('src_static_routes', [])
dst_node_id = parameters['dst_node_id' ]
dst_tp_id = parameters['dst_tp_id' ]
dst_vlan_tag = parameters['dst_vlan_tag' ]
dst_static_routes = parameters.get('dst_static_routes', [])
service_type = ServiceTypeEnum._value2member_map_[service_type]
data = compose_etht_service(
name, service_type, osu_tunnel_name,
src_node_id, src_tp_id, src_vlan_tag, dst_node_id, dst_tp_id, dst_vlan_tag,
src_static_routes=src_static_routes, dst_static_routes=dst_static_routes
)
return self._rest_api_update(data)
def delete(self, etht_service_name : str) -> bool:
return self._rest_api_delete(etht_service_name)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging
from typing import Dict, List, Optional, Union
from .RestApiClient import HTTP_STATUS_CREATED, HTTP_STATUS_NO_CONTENT, HTTP_STATUS_OK, RestApiClient
LOGGER = logging.getLogger(__name__)
class EndpointProtectionRoleEnum(enum.Enum):
WORK = 'work'
class LspProtectionTypeEnum(enum.Enum):
UNPROTECTED = 'ietf-te-types:lsp-protection-unprotected'
class LspRestorationTypeEnum(enum.Enum):
NOT_APPLICABLE = 'ietf-te-types:lsp-restoration-not-applicable'
class TunnelAdminStateEnum(enum.Enum):
UP = 'ietf-te-types:tunnel-admin-state-up'
class OduTypeEnum(enum.Enum):
OSUFLEX = 'osuflex'
def compose_osu_tunnel_endpoint(
node_id : str, tp_id : str, ttp_channel_name : str,
protection_role : EndpointProtectionRoleEnum = EndpointProtectionRoleEnum.WORK
) -> Dict:
return {
'node-id': node_id, 'tp-id': tp_id, 'ttp-channel-name': ttp_channel_name,
'protection-role': protection_role.value
}
def compose_osu_tunnel_te_bandwidth_odu(odu_type : OduTypeEnum, number : int) -> Dict:
return {'layer': 'odu', 'odu-type': odu_type.value, 'number': number}
def compose_osu_tunnel_protection(
type_ : LspProtectionTypeEnum = LspProtectionTypeEnum.UNPROTECTED, reversion_disable : bool = True
) -> Dict:
return {'protection-type': type_.value, 'protection-reversion-disable': reversion_disable}
def compose_osu_tunnel_restoration(
type_ : LspRestorationTypeEnum = LspRestorationTypeEnum.NOT_APPLICABLE, restoration_lock : bool = False
) -> Dict:
return {'restoration-type': type_.value, 'restoration-lock': restoration_lock}
def compose_osu_tunnel(
name : str,
src_node_id : str, src_tp_id : str, src_ttp_channel_name : str,
dst_node_id : str, dst_tp_id : str, dst_ttp_channel_name : str,
odu_type : OduTypeEnum, osuflex_number : int,
delay : int, bidirectional : bool = True,
admin_state : TunnelAdminStateEnum = TunnelAdminStateEnum.UP
) -> Dict:
return {'ietf-te:tunnel': [{
'name': name,
'title': name.upper(),
'admin-state': admin_state.value,
'delay': delay,
'te-bandwidth': compose_osu_tunnel_te_bandwidth_odu(odu_type, osuflex_number),
'bidirectional': bidirectional,
'source-endpoints': {'source-endpoint': [
compose_osu_tunnel_endpoint(src_node_id, src_tp_id, src_ttp_channel_name),
]},
'destination-endpoints': {'destination-endpoint': [
compose_osu_tunnel_endpoint(dst_node_id, dst_tp_id, dst_ttp_channel_name),
]},
'restoration': compose_osu_tunnel_restoration(),
'protection': compose_osu_tunnel_protection(),
}]}
class OsuTunnelHandler:
def __init__(self, rest_api_client : RestApiClient) -> None:
self._rest_api_client = rest_api_client
self._object_name = 'OsuTunnel'
self._subpath_root = '/ietf-te:te/tunnels'
self._subpath_item = self._subpath_root + '/tunnel="{osu_tunnel_name:s}"'
def _rest_api_get(self, osu_tunnel_name : Optional[str] = None) -> Union[Dict, List]:
if osu_tunnel_name is None:
subpath_url = self._subpath_root
else:
subpath_url = self._subpath_item.format(osu_tunnel_name=osu_tunnel_name)
return self._rest_api_client.get(
self._object_name, subpath_url, expected_http_status={HTTP_STATUS_OK}
)
def _rest_api_update(self, data : Dict) -> bool:
return self._rest_api_client.update(
self._object_name, self._subpath_root, data, expected_http_status={HTTP_STATUS_CREATED}
)
def _rest_api_delete(self, osu_tunnel_name : str) -> bool:
if osu_tunnel_name is None: raise Exception('osu_tunnel_name is None')
subpath_url = self._subpath_item.format(osu_tunnel_name=osu_tunnel_name)
return self._rest_api_client.delete(
self._object_name, subpath_url, expected_http_status={HTTP_STATUS_NO_CONTENT}
)
def get(self, osu_tunnel_name : Optional[str] = None) -> Union[Dict, List]:
data = self._rest_api_get(osu_tunnel_name=osu_tunnel_name)
if not isinstance(data, dict): raise ValueError('data should be a dict')
if 'ietf-te:tunnel' not in data: raise ValueError('data does not contain key "ietf-te:tunnel"')
data = data['ietf-te:tunnel']
if not isinstance(data, list): raise ValueError('data[ietf-te:tunnel] should be a list')
osu_tunnels : List[Dict] = list()
for item in data:
src_endpoints = item['source-endpoints']['source-endpoint']
if len(src_endpoints) != 1:
MSG = 'OsuTunnel({:s}) has zero/multiple source endpoints'
raise Exception(MSG.format(str(item)))
src_endpoint = src_endpoints[0]
dst_endpoints = item['destination-endpoints']['destination-endpoint']
if len(dst_endpoints) != 1:
MSG = 'OsuTunnel({:s}) has zero/multiple destination endpoints'
raise Exception(MSG.format(str(item)))
dst_endpoint = dst_endpoints[0]
osu_tunnel = {
'name' : item['name'],
'src_node_id' : src_endpoint['node-id'],
'src_tp_id' : src_endpoint['tp-id'],
'src_ttp_channel_name': src_endpoint['ttp-channel-name'],
'dst_node_id' : dst_endpoint['node-id'],
'dst_tp_id' : dst_endpoint['tp-id'],
'dst_ttp_channel_name': dst_endpoint['ttp-channel-name'],
'odu_type' : item['te-bandwidth']['odu-type'],
'osuflex_number' : item['te-bandwidth']['number'],
'delay' : item['delay'],
'bidirectional' : item['bidirectional'],
}
osu_tunnels.append(osu_tunnel)
return osu_tunnels
def update(self, parameters : Dict) -> bool:
name = parameters['name' ]
src_node_id = parameters['src_node_id' ]
src_tp_id = parameters['src_tp_id' ]
src_ttp_channel_name = parameters['src_ttp_channel_name']
dst_node_id = parameters['dst_node_id' ]
dst_tp_id = parameters['dst_tp_id' ]
dst_ttp_channel_name = parameters['dst_ttp_channel_name']
odu_type = parameters.get('odu_type', OduTypeEnum.OSUFLEX.value)
osuflex_number = parameters.get('osuflex_number', 1 )
delay = parameters.get('delay', 20 )
bidirectional = parameters.get('bidirectional', True )
odu_type = OduTypeEnum._value2member_map_[odu_type]
data = compose_osu_tunnel(
name, src_node_id, src_tp_id, src_ttp_channel_name, dst_node_id, dst_tp_id, dst_ttp_channel_name,
odu_type, osuflex_number, delay, bidirectional=bidirectional
)
return self._rest_api_update(data)
def delete(self, osu_tunnel_name : str) -> bool:
return self._rest_api_delete(osu_tunnel_name)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, json, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Any, Dict, List, Set, Union
LOGGER = logging.getLogger(__name__)
DEFAULT_BASE_URL = '/restconf/v2/data'
DEFAULT_SCHEME = 'https'
DEFAULT_TIMEOUT = 120
DEFAULT_VERIFY = False
HTTP_STATUS_OK = 200
HTTP_STATUS_CREATED = 201
HTTP_STATUS_ACCEPTED = 202
HTTP_STATUS_NO_CONTENT = 204
HTTP_OK_CODES = {
HTTP_STATUS_OK,
HTTP_STATUS_CREATED,
HTTP_STATUS_ACCEPTED,
HTTP_STATUS_NO_CONTENT,
}
class RestApiClient:
def __init__(self, address : str, port : int, settings : Dict[str, Any] = dict()) -> None:
username = settings.get('username')
password = settings.get('password')
self._auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
scheme = settings.get('scheme', DEFAULT_SCHEME )
base_url = settings.get('base_url', DEFAULT_BASE_URL)
self._base_url = '{:s}://{:s}:{:d}{:s}'.format(scheme, address, int(port), base_url)
self._timeout = int(settings.get('timeout', DEFAULT_TIMEOUT))
self._verify = bool(settings.get('verify', DEFAULT_VERIFY))
def get(
self, object_name : str, url : str,
expected_http_status : Set[int] = {HTTP_STATUS_OK}
) -> Union[Dict, List]:
MSG = 'Get {:s}({:s})'
LOGGER.info(MSG.format(str(object_name), str(url)))
response = requests.get(
self._base_url + url,
timeout=self._timeout, verify=self._verify, auth=self._auth
)
LOGGER.info(' Response[{:s}]: {:s}'.format(str(response.status_code), str(response.content)))
if response.status_code in expected_http_status: return json.loads(response.content)
MSG = 'Could not get {:s}({:s}): status_code={:s} reply={:s}'
raise Exception(MSG.format(str(object_name), str(url), str(response.status_code), str(response)))
def update(
self, object_name : str, url : str, data : Dict, headers : Dict[str, Any] = dict(),
expected_http_status : Set[int] = HTTP_OK_CODES
) -> None:
headers = copy.deepcopy(headers)
if 'content-type' not in {header_name.lower() for header_name in headers.keys()}:
headers.update({'content-type': 'application/json'})
MSG = 'Create/Update {:s}({:s}, {:s})'
LOGGER.info(MSG.format(str(object_name), str(url), str(data)))
response = requests.post(
self._base_url + url, data=json.dumps(data), headers=headers,
timeout=self._timeout, verify=self._verify, auth=self._auth
)
LOGGER.info(' Response[{:s}]: {:s}'.format(str(response.status_code), str(response.content)))
if response.status_code in expected_http_status: return
MSG = 'Could not create/update {:s}({:s}, {:s}): status_code={:s} reply={:s}'
raise Exception(MSG.format(str(object_name), str(url), str(data), str(response.status_code), str(response)))
def delete(
self, object_name : str, url : str,
expected_http_status : Set[int] = HTTP_OK_CODES
) -> None:
MSG = 'Delete {:s}({:s})'
LOGGER.info(MSG.format(str(object_name), str(url)))
response = requests.delete(
self._base_url + url,
timeout=self._timeout, verify=self._verify, auth=self._auth
)
LOGGER.info(' Response[{:s}]: {:s}'.format(str(response.status_code), str(response.content)))
if response.status_code in expected_http_status: return
MSG = 'Could not delete {:s}({:s}): status_code={:s} reply={:s}'
raise Exception(MSG.format(str(object_name), str(url), str(response.status_code), str(response)))
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[
{"action": 1, "custom": {"resource_key": "/osu_tunnels/osu_tunnel[osu_tunnel_1]", "resource_value": {
"name": "osu_tunnel_1", "odu_type": "osuflex", "osuflex_number": 40, "bidirectional": true, "delay": 20,
"src_node_id": "10.0.10.1", "src_tp_id": "200", "src_ttp_channel_name": "och:1-odu2:1-oduflex:1-osuflex:2",
"dst_node_id": "10.0.30.1", "dst_tp_id": "200", "dst_ttp_channel_name": "och:1-odu2:1-oduflex:3-osuflex:1"
}}},
{"action": 1, "custom": {"resource_key": "/osu_tunnels/osu_tunnel[osu_tunnel_2]", "resource_value": {
"name": "osu_tunnel_2", "odu_type": "osuflex", "osuflex_number": 40, "bidirectional": true, "delay": 20,
"src_node_id": "10.0.10.1", "src_tp_id": "200", "src_ttp_channel_name": "och:1-odu2:1-oduflex:1-osuflex:2",
"dst_node_id": "10.0.30.1", "dst_tp_id": "200", "dst_ttp_channel_name": "och:1-odu2:1-oduflex:3-osuflex:1"
}}},
{"action": 1, "custom": {"resource_key": "/etht_services/etht_service[etht_service_1]", "resource_value": {
"name": "etht_service_1", "osu_tunnel_name": "osu_tunnel_1", "service_type": "op-mp2mp-svc",
"src_node_id": "10.0.10.1", "src_tp_id": "200", "src_vlan_tag": 21, "src_static_routes": [
["128.32.10.5", 24, "128.32.33.5"],
["128.32.20.5", 24, "128.32.33.5"]
],
"dst_node_id": "10.0.30.1", "dst_tp_id": "200", "dst_vlan_tag": 101, "dst_static_routes": [
["172.1.101.22", 24, "172.10.33.5"]
]
}}},
{"action": 1, "custom": {"resource_key": "/etht_services/etht_service[etht_service_2]", "resource_value": {
"name": "etht_service_2", "osu_tunnel_name": "osu_tunnel_2", "service_type": "op-mp2mp-svc",
"src_node_id": "10.0.10.1", "src_tp_id": "200", "src_vlan_tag": 31, "src_static_routes": [
["128.32.10.5", 24, "128.32.33.5"],
["128.32.20.5", 24, "128.32.33.5"]
],
"dst_node_id": "10.0.30.1", "dst_tp_id": "200", "dst_vlan_tag": 201, "dst_static_routes": [
["172.1.201.22", 24, "172.10.33.5"]
]
}}}
]
[
{"action": 2, "custom": {"resource_key": "/osu_tunnels/osu_tunnel[osu_tunnel_1]", "resource_value": {
"name": "osu_tunnel_1"
}}},
{"action": 2, "custom": {"resource_key": "/osu_tunnels/osu_tunnel[osu_tunnel_2]", "resource_value": {
"name": "osu_tunnel_2"
}}},
{"action": 2, "custom": {"resource_key": "/etht_services/etht_service[etht_service_1]", "resource_value": {
"name": "etht_service_1"
}}},
{"action": 2, "custom": {"resource_key": "/etht_services/etht_service[etht_service_2]", "resource_value": {
"name": "etht_service_2"
}}}
]
{
"ietf-eth-tran-service:etht-svc": {
"etht-svc-instances": [
{
"etht-svc-name": "etht_service_1",
"etht-svc-title": "ETHT_SERVICE_1",
"etht-svc-type": "op-mp2mp-svc",
"source-endpoints": {
"source-endpoint": [
{
"node-id": "10.0.10.1",
"tp-id": "200",
"protection-role": "work",
"layer-specific": {
"access-type": "port"
},
"is-extendable": false,
"is-terminal": true,
"static-route-list": [
{
"destination": "128.32.10.5",
"destination-mask": 24,
"next-hop": "128.32.33.5"
},
{
"destination": "128.32.20.5",
"destination-mask": 24,
"next-hop": "128.32.33.5"
}
],
"outer-tag": {
"tag-type": "ietf-eth-tran-types:classify-c-vlan",
"vlan-value": 21
},
"service-classification-type": "ietf-eth-tran-type:vlan-classification",
"ingress-egress-bandwidth-profile" : {
"bandwidth-profile-type": "ietf-eth-tran-types:mef-10-bwp",
"CIR": 10000000,
"EIR": 10000000
}
}
]
},
"destination-endpoints": {
"destination-endpoint": [
{
"node-id": "10.0.30.1",
"tp-id": "200",
"protection-role": "work",
"layer-specific": {
"access-type": "port"
},
"is-extendable": false,
"is-terminal": true,
"static-route-list": [
{
"destination": "172.1.101.22",
"destination-mask": 24,
"next-hop": "172.10.33.5"
}
],
"outer-tag": {
"tag-type": "ietf-eth-tran-types:classify-c-vlan",
"vlan-value": 101
},
"service-classification-type": "ietf-eth-tran-type:vlan-classification",
"ingress-egress-bandwidth-profile" : {
"bandwidth-profile-type": "ietf-eth-tran-types:mef-10-bwp",
"CIR": 10000000,
"EIR": 10000000
}
}
]
},
"svc-tunnel": [
{
"tunnel-name": "osu_tunnel_1"
}
],
"optimizations": {
"optimization-metric": [
{
"metric-role": "work",
"metric-type": "ietf-te-types:path-metric-te"
}
]
}
},
{
"etht-svc-name": "etht_service_2",
"etht-svc-title": "ETHT_SERVICE_2",
"etht-svc-type": "op-mp2mp-svc",
"source-endpoints": {
"source-endpoint": [
{
"node-id": "10.0.10.1",
"tp-id": "200",
"protection-role": "work",
"layer-specific": {
"access-type": "port"
},
"is-extendable": false,
"is-terminal": true,
"static-route-list": [
{
"destination": "128.32.10.5",
"destination-mask": 24,
"next-hop": "128.32.33.5"
},
{
"destination": "128.32.20.5",
"destination-mask": 24,
"next-hop": "128.32.33.5"
}
],
"outer-tag": {
"tag-type": "ietf-eth-tran-types:classify-c-vlan",
"vlan-value": 31
},
"service-classification-type": "ietf-eth-tran-type:vlan-classification",
"ingress-egress-bandwidth-profile" : {
"bandwidth-profile-type": "ietf-eth-tran-types:mef-10-bwp",
"CIR": 10000000,
"EIR": 10000000
}
}
]
},
"destination-endpoints": {
"destination-endpoint": [
{
"node-id": "10.0.30.1",
"tp-id": "200",
"protection-role": "work",
"layer-specific": {
"access-type": "port"
},
"is-extendable": false,
"is-terminal": true,
"static-route-list": [
{
"destination": "172.1.201.22",
"destination-mask": 24,
"next-hop": "172.10.33.5"
}
],
"outer-tag": {
"tag-type": "ietf-eth-tran-types:classify-c-vlan",
"vlan-value": 201
},
"service-classification-type": "ietf-eth-tran-type:vlan-classification",
"ingress-egress-bandwidth-profile" : {
"bandwidth-profile-type": "ietf-eth-tran-types:mef-10-bwp",
"CIR": 10000000,
"EIR": 10000000
}
}
]
},
"svc-tunnel": [
{
"tunnel-name": "osu_tunnel_2"
}
],
"optimizations": {
"optimization-metric": [
{
"metric-role": "work",
"metric-type": "ietf-te-types:path-metric-te"
}
]
}
}
]
}
}
\ No newline at end of file
{
"ietf-te:tunnel": [
{
"name": "osu_tunnel_1",
"title": "OSU_TUNNEL_1",
"admin-state": "ietf-te-types:tunnel-admin-state-up",
"delay": 20,
"te-bandwidth": {
"layer": "odu",
"odu-type": "osuflex",
"number": 40
},
"bidirectional": true,
"destination-endpoints": {
"destination-endpoint": [
{
"node-id": "10.0.30.1",
"tp-id": "200",
"ttp-channel-name": "och:1-odu2:1-oduflex:3-osuflex:1",
"protection-role": "work"
}
]
},
"source-endpoints": {
"source-endpoint": [
{
"node-id": "10.0.10.1",
"tp-id": "200",
"ttp-channel-name": "och:1-odu2:1-oduflex:1-osuflex:2",
"protection-role": "work"
}
]
},
"restoration": {
"restoration-type": "ietf-te-types:lsp-restoration-not-applicable",
"restoration-lock": false
},
"protection": {
"protection-type": "ietf-te-types:lsp-protection-unprotected",
"protection-reversion-disable": true
}
},
{
"name": "osu_tunnel_2",
"title": "OSU_TUNNEL_2",
"admin-state": "ietf-te-types:tunnel-admin-state-up",
"delay": 20,
"te-bandwidth": {
"layer": "odu",
"odu-type": "osuflex",
"number": 40
},
"bidirectional": true,
"destination-endpoints": {
"destination-endpoint": [
{
"node-id": "10.0.30.1",
"tp-id": "200",
"ttp-channel-name": "och:1-odu2:1-oduflex:3-osuflex:1",
"protection-role": "work"
}
]
},
"source-endpoints": {
"source-endpoint": [
{
"node-id": "10.0.10.1",
"tp-id": "200",
"ttp-channel-name": "och:1-odu2:1-oduflex:1-osuflex:2",
"protection-role": "work"
}
]
},
"restoration": {
"restoration-type": "ietf-te-types:lsp-restoration-not-applicable",
"restoration-lock": false
},
"protection": {
"protection-type": "ietf-te-types:lsp-protection-unprotected",
"protection-reversion-disable": true
}
}
]
}
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, deepdiff, json, logging, operator, os, pytest, time
from flask import Flask, jsonify, make_response
from flask_restful import Resource
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId
from common.tools.descriptor.Tools import format_custom_config_rules
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Device import (
json_device_connect_rules, json_device_id, json_device_ietf_actn_disabled
)
from common.tools.service.GenericRestServer import GenericRestServer
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver
from tests.tools.mock_ietf_actn_sdn_ctrl.ResourceEthServices import EthService, EthServices
from tests.tools.mock_ietf_actn_sdn_ctrl.ResourceOsuTunnels import OsuTunnel, OsuTunnels
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment
)
DEVICE_UUID = 'DEVICE-IETF-ACTN'
DEVICE_ADDRESS = '127.0.0.1'
DEVICE_PORT = 8080
DEVICE_USERNAME = 'admin'
DEVICE_PASSWORD = 'admin'
DEVICE_SCHEME = 'http'
DEVICE_BASE_URL = '/restconf/v2/data'
DEVICE_TIMEOUT = 120
DEVICE_VERIFY = False
DEVICE_ID = json_device_id(DEVICE_UUID)
DEVICE = json_device_ietf_actn_disabled(DEVICE_UUID)
DEVICE_CONNECT_RULES = json_device_connect_rules(DEVICE_ADDRESS, DEVICE_PORT, {
'scheme' : DEVICE_SCHEME,
'username': DEVICE_USERNAME,
'password': DEVICE_PASSWORD,
'base_url': DEVICE_BASE_URL,
'timeout' : DEVICE_TIMEOUT,
'verify' : DEVICE_VERIFY,
})
DATA_FILE_CONFIG_RULES = 'device/tests/data/ietf_actn/config_rules.json'
DATA_FILE_DECONFIG_RULES = 'device/tests/data/ietf_actn/deconfig_rules.json'
DATA_FILE_EXPECTED_OSU_TUNNELS = 'device/tests/data/ietf_actn/expected_osu_tunnels.json'
DATA_FILE_EXPECTED_ETHT_SERVICES = 'device/tests/data/ietf_actn/expected_etht_services.json'
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')
def ietf_actn_sdn_ctrl(
device_service : DeviceService, # pylint: disable=redefined-outer-name
) -> Flask:
_rest_server = GenericRestServer(DEVICE_PORT, DEVICE_BASE_URL, bind_address=DEVICE_ADDRESS)
_rest_server.app.config['DEBUG' ] = True
_rest_server.app.config['ENV' ] = 'development'
_rest_server.app.config['SERVER_NAME'] = '{:s}:{:d}'.format(DEVICE_ADDRESS, DEVICE_PORT)
_rest_server.app.config['TESTING' ] = True
class Root(Resource):
def get(self):
return make_response(jsonify({}), 200)
add_rsrc = _rest_server.add_resource
add_rsrc(Root, '/')
add_rsrc(OsuTunnels, '/ietf-te:te/tunnels')
add_rsrc(OsuTunnel, '/ietf-te:te/tunnels/tunnel="<string:osu_tunnel_name>"')
add_rsrc(EthServices, '/ietf-eth-tran-service:etht-svc')
add_rsrc(EthService, '/ietf-eth-tran-service:etht-svc/etht-svc-instances="<string:etht_service_name>"')
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
def test_device_ietf_actn_add(
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService, # pylint: disable=redefined-outer-name
ietf_actn_sdn_ctrl : GenericRestServer, # pylint: disable=redefined-outer-name
) -> None:
DEVICE_WITH_CONNECT_RULES = copy.deepcopy(DEVICE)
DEVICE_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_CONNECT_RULES)
device_client.AddDevice(Device(**DEVICE_WITH_CONNECT_RULES))
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver: _Driver = driver_instance_cache.get(DEVICE_UUID)
assert driver is not None
def test_device_ietf_actn_get(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
ietf_actn_sdn_ctrl : GenericRestServer, # pylint: disable=redefined-outer-name
) -> None:
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_ID))
LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_ID))
LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
def test_device_ietf_actn_configure(
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService, # pylint: disable=redefined-outer-name
ietf_actn_sdn_ctrl : GenericRestServer, # pylint: disable=redefined-outer-name
) -> None:
ietf_actn_client = ietf_actn_sdn_ctrl.app.test_client()
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_UUID)
assert driver is not None
retrieved_osu_tunnels = ietf_actn_client.get('/restconf/v2/data/ietf-te:te/tunnels')
retrieved_osu_tunnels = retrieved_osu_tunnels.json
LOGGER.info('osu_tunnels = {:s}'.format(str(retrieved_osu_tunnels)))
expected_osu_tunnels = {'ietf-te:tunnel': []}
osu_tunnels_diff = deepdiff.DeepDiff(expected_osu_tunnels, retrieved_osu_tunnels)
if len(osu_tunnels_diff) > 0:
LOGGER.error('PRE OSU TUNNELS - Differences:\n{:s}'.format(str(osu_tunnels_diff.pretty())))
assert len(osu_tunnels_diff) == 0
retrieved_etht_services = ietf_actn_client.get('/restconf/v2/data/ietf-eth-tran-service:etht-svc')
retrieved_etht_services = retrieved_etht_services.json
LOGGER.info('etht_services = {:s}'.format(str(retrieved_etht_services)))
expected_etht_services = {'ietf-eth-tran-service:etht-svc': {'etht-svc-instances': []}}
etht_services_diff = deepdiff.DeepDiff(expected_etht_services, retrieved_etht_services)
if len(etht_services_diff) > 0:
LOGGER.error('PRE ETHT SERVICES - Differences:\n{:s}'.format(str(etht_services_diff.pretty())))
assert len(etht_services_diff) == 0
retrieved_driver_config_rules = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('driver_config = {:s}'.format(str(retrieved_driver_config_rules)))
assert isinstance(retrieved_driver_config_rules, list)
if len(retrieved_driver_config_rules) > 0:
LOGGER.error('PRE DRIVER CONFIG RULES - Differences:\n{:s}'.format(str(retrieved_driver_config_rules)))
assert len(retrieved_driver_config_rules) == 0
DEVICE_WITH_CONFIG_RULES = copy.deepcopy(DEVICE)
with open(DATA_FILE_CONFIG_RULES, 'r', encoding='UTF-8') as f:
config_rules = format_custom_config_rules(json.load(f))
DEVICE_WITH_CONFIG_RULES['device_config']['config_rules'].extend(config_rules)
device_client.ConfigureDevice(Device(**DEVICE_WITH_CONFIG_RULES))
retrieved_osu_tunnels = ietf_actn_client.get('/restconf/v2/data/ietf-te:te/tunnels')
retrieved_osu_tunnels = retrieved_osu_tunnels.json
LOGGER.info('osu_tunnels = {:s}'.format(str(retrieved_osu_tunnels)))
with open(DATA_FILE_EXPECTED_OSU_TUNNELS, 'r', encoding='UTF-8') as f:
expected_osu_tunnels = json.load(f)
osu_tunnels_diff = deepdiff.DeepDiff(expected_osu_tunnels, retrieved_osu_tunnels)
if len(osu_tunnels_diff) > 0:
LOGGER.error('POST OSU TUNNELS - Differences:\n{:s}'.format(str(osu_tunnels_diff.pretty())))
assert len(osu_tunnels_diff) == 0
retrieved_etht_services = ietf_actn_client.get('/restconf/v2/data/ietf-eth-tran-service:etht-svc')
retrieved_etht_services = retrieved_etht_services.json
LOGGER.info('etht_services = {:s}'.format(str(retrieved_etht_services)))
with open(DATA_FILE_EXPECTED_ETHT_SERVICES, 'r', encoding='UTF-8') as f:
expected_etht_services = json.load(f)
etht_services_diff = deepdiff.DeepDiff(expected_etht_services, retrieved_etht_services)
if len(etht_services_diff) > 0:
LOGGER.error('POST ETHT SERVICES - Differences:\n{:s}'.format(str(etht_services_diff.pretty())))
assert len(etht_services_diff) == 0
retrieved_driver_config_rules = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('driver_config = {:s}'.format(str(retrieved_driver_config_rules)))
retrieved_driver_config_rules = [
{'action': 1, 'custom': {'resource_key': resource_key, 'resource_value': resource_value}}
for resource_key, resource_value in retrieved_driver_config_rules
]
with open(DATA_FILE_CONFIG_RULES, 'r', encoding='UTF-8') as f:
expected_driver_config_rules = sorted(json.load(f), key=lambda cr: cr['custom']['resource_key'])
driver_config_rules_diff = deepdiff.DeepDiff(expected_driver_config_rules, retrieved_driver_config_rules)
if len(driver_config_rules_diff) > 0:
LOGGER.error('POST DRIVER CONFIG RULES - Differences:\n{:s}'.format(str(driver_config_rules_diff.pretty())))
assert len(driver_config_rules_diff) == 0
def test_device_ietf_actn_deconfigure(
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService, # pylint: disable=redefined-outer-name
ietf_actn_sdn_ctrl : GenericRestServer, # pylint: disable=redefined-outer-name
) -> None:
ietf_actn_client = ietf_actn_sdn_ctrl.app.test_client()
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_UUID)
assert driver is not None
retrieved_osu_tunnels = ietf_actn_client.get('/restconf/v2/data/ietf-te:te/tunnels')
retrieved_osu_tunnels = retrieved_osu_tunnels.json
LOGGER.info('osu_tunnels = {:s}'.format(str(retrieved_osu_tunnels)))
with open(DATA_FILE_EXPECTED_OSU_TUNNELS, 'r', encoding='UTF-8') as f:
expected_osu_tunnels = json.load(f)
osu_tunnels_diff = deepdiff.DeepDiff(expected_osu_tunnels, retrieved_osu_tunnels)
if len(osu_tunnels_diff) > 0:
LOGGER.error('PRE OSU TUNNELS - Differences:\n{:s}'.format(str(osu_tunnels_diff.pretty())))
assert len(osu_tunnels_diff) == 0
retrieved_etht_services = ietf_actn_client.get('/restconf/v2/data/ietf-eth-tran-service:etht-svc')
retrieved_etht_services = retrieved_etht_services.json
LOGGER.info('etht_services = {:s}'.format(str(retrieved_etht_services)))
with open(DATA_FILE_EXPECTED_ETHT_SERVICES, 'r', encoding='UTF-8') as f:
expected_etht_services = json.load(f)
etht_services_diff = deepdiff.DeepDiff(expected_etht_services, retrieved_etht_services)
if len(etht_services_diff) > 0:
LOGGER.error('PRE ETHT SERVICES - Differences:\n{:s}'.format(str(etht_services_diff.pretty())))
assert len(etht_services_diff) == 0
retrieved_driver_config_rules = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('driver_config = {:s}'.format(str(retrieved_driver_config_rules)))
retrieved_driver_config_rules = [
{'action': 1, 'custom': {'resource_key': resource_key, 'resource_value': resource_value}}
for resource_key, resource_value in retrieved_driver_config_rules
]
with open(DATA_FILE_CONFIG_RULES, 'r', encoding='UTF-8') as f:
expected_driver_config_rules = sorted(json.load(f), key=lambda cr: cr['custom']['resource_key'])
driver_config_rules_diff = deepdiff.DeepDiff(expected_driver_config_rules, retrieved_driver_config_rules)
if len(driver_config_rules_diff) > 0:
LOGGER.error('PRE DRIVER CONFIG RULES - Differences:\n{:s}'.format(str(driver_config_rules_diff.pretty())))
assert len(driver_config_rules_diff) == 0
DEVICE_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE)
with open(DATA_FILE_DECONFIG_RULES, 'r', encoding='UTF-8') as f:
deconfig_rules = format_custom_config_rules(json.load(f))
DEVICE_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(deconfig_rules)
device_client.ConfigureDevice(Device(**DEVICE_WITH_DECONFIG_RULES))
retrieved_osu_tunnels = ietf_actn_client.get('/restconf/v2/data/ietf-te:te/tunnels')
retrieved_osu_tunnels = retrieved_osu_tunnels.json
LOGGER.info('osu_tunnels = {:s}'.format(str(retrieved_osu_tunnels)))
expected_osu_tunnels = {'ietf-te:tunnel': []}
osu_tunnels_diff = deepdiff.DeepDiff(expected_osu_tunnels, retrieved_osu_tunnels)
if len(osu_tunnels_diff) > 0:
LOGGER.error('POST OSU TUNNELS - Differences:\n{:s}'.format(str(osu_tunnels_diff.pretty())))
assert len(osu_tunnels_diff) == 0
retrieved_etht_services = ietf_actn_client.get('/restconf/v2/data/ietf-eth-tran-service:etht-svc')
retrieved_etht_services = retrieved_etht_services.json
LOGGER.info('etht_services = {:s}'.format(str(retrieved_etht_services)))
expected_etht_services = {'ietf-eth-tran-service:etht-svc': {'etht-svc-instances': []}}
etht_services_diff = deepdiff.DeepDiff(expected_etht_services, retrieved_etht_services)
if len(etht_services_diff) > 0:
LOGGER.error('POST ETHT SERVICES - Differences:\n{:s}'.format(str(etht_services_diff.pretty())))
assert len(etht_services_diff) == 0
retrieved_driver_config_rules = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('retrieved_driver_config_rules = {:s}'.format(str(retrieved_driver_config_rules)))
assert isinstance(retrieved_driver_config_rules, list)
if len(retrieved_driver_config_rules) > 0:
LOGGER.error('POST DRIVER CONFIG RULES - Differences:\n{:s}'.format(str(retrieved_driver_config_rules)))
assert len(retrieved_driver_config_rules) == 0
def test_device_ietf_actn_delete(
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService, # pylint: disable=redefined-outer-name
ietf_actn_sdn_ctrl : GenericRestServer, # pylint: disable=redefined-outer-name
) -> None:
device_client.DeleteDevice(DeviceId(**DEVICE_ID))
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_UUID, {})
assert driver is None
......@@ -66,6 +66,7 @@ unit_test nbi:
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_debug_api.py --junitxml=/opt/results/${IMAGE_NAME}_report_debug_api.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l2vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l2vpn.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_network.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_network.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l3vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l3vpn.xml"
......
......@@ -55,7 +55,7 @@ def process_vpn_service(
def update_service_endpoint(
service_uuid : str, site_id : str, device_uuid : str, endpoint_uuid : str,
vlan_tag : int, ipv4_address : str, ipv4_prefix_length : int,
vlan_tag : int, ipv4_address : str, neighbor_ipv4_address : str, ipv4_prefix_length : int,
capacity_gbps : Optional[float] = None, e2e_latency_ms : Optional[float] = None,
availability : Optional[float] = None, mtu : Optional[int] = None,
static_routing : Optional[Dict[Tuple[str, str], str]] = None,
......@@ -91,12 +91,15 @@ def update_service_endpoint(
for (ip_range, ip_prefix_len, lan_tag), next_hop in static_routing.items()
})
ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/vlan[{:d}]/settings'
endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid, vlan_tag)
#ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/vlan[{:d}]/settings'
#endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid, vlan_tag)
ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/settings'
endpoint_settings_key = ENDPOINT_SETTINGS_KEY.format(device_uuid, endpoint_uuid)
field_updates = {}
if vlan_tag is not None: field_updates['vlan_tag' ] = (vlan_tag, True)
if ipv4_address is not None: field_updates['ip_address' ] = (ipv4_address, True)
if ipv4_prefix_length is not None: field_updates['prefix_length'] = (ipv4_prefix_length, True)
if vlan_tag is not None: field_updates['vlan_tag' ] = (vlan_tag, True)
if ipv4_address is not None: field_updates['ip_address' ] = (ipv4_address, True)
if neighbor_ipv4_address is not None: field_updates['neighbor_address'] = (neighbor_ipv4_address, True)
if ipv4_prefix_length is not None: field_updates['prefix_length' ] = (ipv4_prefix_length, True)
update_config_rule_custom(config_rules, endpoint_settings_key, field_updates)
try:
......@@ -131,7 +134,7 @@ def process_site_network_access(
raise NotImplementedError(MSG.format(str(ipv4_allocation['address-allocation-type'])))
ipv4_allocation_addresses = ipv4_allocation['addresses']
ipv4_provider_address = ipv4_allocation_addresses['provider-address']
#ipv4_customer_address = ipv4_allocation_addresses['customer-address']
ipv4_customer_address = ipv4_allocation_addresses['customer-address']
ipv4_prefix_length = ipv4_allocation_addresses['prefix-length' ]
vlan_tag = None
......@@ -176,7 +179,8 @@ def process_site_network_access(
availability = qos_profile_class['bandwidth']['guaranteed-bw-percent']
exc = update_service_endpoint(
service_uuid, site_id, device_uuid, endpoint_uuid, vlan_tag, ipv4_provider_address, ipv4_prefix_length,
service_uuid, site_id, device_uuid, endpoint_uuid,
vlan_tag, ipv4_customer_address, ipv4_provider_address, ipv4_prefix_length,
capacity_gbps=service_bandwidth_gbps, e2e_latency_ms=max_e2e_latency_ms, availability=availability,
mtu=service_mtu, static_routing=site_static_routing
)
......
......@@ -28,9 +28,11 @@ IGNORE_DEVICE_TYPES = {
DeviceTypeEnum.DATACENTER.value,
DeviceTypeEnum.EMULATED_CLIENT.value,
DeviceTypeEnum.EMULATED_DATACENTER.value,
DeviceTypeEnum.EMULATED_IP_SDN_CONTROLLER,
DeviceTypeEnum.EMULATED_MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.EMULATED_XR_CONSTELLATION.value,
DeviceTypeEnum.IP_SDN_CONTROLLER,
DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.NETWORK.value,
DeviceTypeEnum.OPEN_LINE_SYSTEM.value,
......@@ -39,10 +41,10 @@ IGNORE_DEVICE_TYPES = {
IGNORE_DEVICE_NAMES = {
NetworkTypeEnum.TE_OTN_TOPOLOGY: {
'128.32.10.1', '128.32.33.5', '128.32.20.5', '128.32.20.1', '128.32.10.5', 'nce-t'
'nce-t', '128.32.10.1', '128.32.33.5', '128.32.20.5', '128.32.20.1', '128.32.10.5',
},
NetworkTypeEnum.TE_ETH_TRAN_TOPOLOGY: {
'nce-t',
},
}
......
This diff is collapsed.
......@@ -39,12 +39,12 @@
{
"lan": "128.32.10.1/24",
"lan-tag": "vlan21",
"next-hop": "128.32.33.5"
"next-hop": "128.32.33.2"
},
{
"lan": "128.32.20.1/24",
"lan-tag": "vlan21",
"next-hop": "128.32.33.5"
"next-hop": "128.32.33.2"
}
]
}
......@@ -82,7 +82,7 @@
{
"lan": "172.1.101.1/24",
"lan-tag": "vlan21",
"next-hop": "10.0.10.1"
"next-hop": "128.32.33.254"
}
]
}
......@@ -147,7 +147,7 @@
{
"lan": "172.1.101.1/24",
"lan-tag": "vlan101",
"next-hop": "172.10.33.5"
"next-hop": "172.10.33.2"
}
]
}
......@@ -185,12 +185,12 @@
{
"lan": "128.32.10.1/24",
"lan-tag": "vlan101",
"next-hop": "10.0.30.1"
"next-hop": "172.10.33.254"
},
{
"lan": "128.32.20.1/24",
"lan-tag": "vlan101",
"next-hop": "10.0.30.1"
"next-hop": "172.10.33.254"
}
]
}
......
......@@ -39,12 +39,12 @@
{
"lan": "128.32.10.1/24",
"lan-tag": "vlan31",
"next-hop": "128.32.33.5"
"next-hop": "128.32.33.2"
},
{
"lan": "128.32.20.1/24",
"lan-tag": "vlan31",
"next-hop": "128.32.33.5"
"next-hop": "128.32.33.2"
}
]
}
......@@ -80,9 +80,9 @@
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": [
{
"lan": "172.1.101.1/24",
"lan": "172.1.201.1/24",
"lan-tag": "vlan31",
"next-hop": "10.0.10.1"
"next-hop": "128.32.33.254"
}
]
}
......@@ -145,9 +145,9 @@
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": [
{
"lan": "172.1.101.1/24",
"lan": "172.1.201.1/24",
"lan-tag": "vlan201",
"next-hop": "172.10.33.1"
"next-hop": "172.10.33.2"
}
]
}
......@@ -185,12 +185,12 @@
{
"lan": "128.32.10.1/24",
"lan-tag": "vlan201",
"next-hop": "10.0.30.1"
"next-hop": "172.10.33.254"
},
{
"lan": "128.32.20.1/24",
"lan-tag": "vlan201",
"next-hop": "10.0.30.1"
"next-hop": "172.10.33.254"
}
]
}
......
......@@ -12,209 +12,206 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, os, pytest, time, urllib
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_port_grpc, get_service_port_http
import logging, urllib
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.proto.context_pb2 import ContextId
from common.tools.descriptor.Loader import (
DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
)
from common.tools.object_factory.Context import json_context_id
from common.type_checkers.Assertions import (
validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
validate_topology_ids)
validate_connection, validate_connection_ids, validate_connections,
validate_context, validate_context_ids, validate_contexts,
validate_device, validate_device_ids, validate_devices,
validate_link, validate_link_ids, validate_links,
validate_service, validate_service_ids, validate_services,
validate_slice, validate_slice_ids, validate_slices,
validate_topologies, validate_topology, validate_topology_ids
)
from context.client.ContextClient import ContextClient
from nbi.tests.PrepareTestScenario import do_rest_get_request
from .MockService_Dependencies import MockService_Dependencies
from .Objects import (
CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, SLICE_R1_R3, TOPOLOGY,
TOPOLOGY_ID, POLICY_RULE, POLICY_RULE_ID, POLICY_RULE_UUID
from nbi.service.rest_server.RestServer import RestServer
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
mock_service, nbi_service_rest, context_client,
do_rest_get_request
)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DESCRIPTOR_FILE = 'nbi/tests/data/debug_api_dummy.json'
@pytest.fixture(scope='session')
def mock_service():
_service = MockService_Dependencies(MOCKSERVICE_PORT)
_service.configure_env_vars()
_service.start()
yield _service
_service.stop()
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
# ----- Prepare Environment --------------------------------------------------------------------------------------------
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def test_prepare_environment(context_client : ContextClient) -> None: # pylint: disable=redefined-outer-name
validate_empty_scenario(context_client)
descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTOR_FILE, context_client=context_client)
results = descriptor_loader.process()
check_descriptor_load_results(results, descriptor_loader)
descriptor_loader.validate()
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT)) # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT)) # avoid privileged ports
MOCKSERVICE_PORT = 10000
DEVICE_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
@pytest.fixture(scope='session')
def context_service_grpc():
_service = ContextService(context_s_mb[0], context_s_mb[1])
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_service_rest():
database = context_db_mb[0]
_rest_server = RestServer()
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient()
yield _client
_client.close()
def test_populate_database():
client = ContextClient(host=LOCAL_HOST, port=GRPC_PORT)
client.SetContext(Context(**CONTEXT))
client.SetTopology(Topology(**TOPOLOGY))
client.SetDevice(Device(**DEVICE_R1))
client.SetDevice(Device(**DEVICE_R2))
client.SetDevice(Device(**DEVICE_R3))
client.SetLink(Link(**LINK_R1_R2))
client.SetLink(Link(**LINK_R1_R3))
client.SetLink(Link(**LINK_R2_R3))
client.SetService(Service(**SERVICE_R1_R2))
client.SetService(Service(**SERVICE_R1_R3))
client.SetService(Service(**SERVICE_R2_R3))
client.SetSlice(Slice(**SLICE_R1_R3))
client.SetConnection(Connection(**CONNECTION_R1_R3))
def test_rest_get_context_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/context_ids')
# Verify the scenario has no services/slices
response = context_client.GetContext(ADMIN_CONTEXT_ID)
assert len(response.topology_ids) == 1
assert len(response.service_ids ) == 3
assert len(response.slice_ids ) == 1
# ----- Context --------------------------------------------------------------------------------------------------------
def test_rest_get_context_ids(nbi_service_rest: RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/context_ids')
validate_context_ids(reply)
def test_rest_get_contexts(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/contexts')
def test_rest_get_contexts(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/contexts')
validate_contexts(reply)
def test_rest_get_context(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_context(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}'.format(context_uuid))
reply = do_rest_get_request('/debug-api/context/{:s}'.format(context_uuid))
validate_context(reply)
def test_rest_get_topology_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
# ----- Topology -------------------------------------------------------------------------------------------------------
def test_rest_get_topology_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/topology_ids'.format(context_uuid))
reply = do_rest_get_request('/debug-api/context/{:s}/topology_ids'.format(context_uuid))
validate_topology_ids(reply)
def test_rest_get_topologies(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_topologies(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/topologies'.format(context_uuid))
reply = do_rest_get_request('/debug-api/context/{:s}/topologies'.format(context_uuid))
validate_topologies(reply)
def test_rest_get_topology(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_topology(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
topology_uuid = urllib.parse.quote(DEFAULT_TOPOLOGY_NAME)
reply = do_rest_get_request('/context/{:s}/topology/{:s}'.format(context_uuid, topology_uuid))
validate_topology(reply, num_devices=3, num_links=3)
reply = do_rest_get_request('/debug-api/context/{:s}/topology/{:s}'.format(context_uuid, topology_uuid))
validate_topology(reply, num_devices=3, num_links=6)
# ----- Device ---------------------------------------------------------------------------------------------------------
def test_rest_get_device_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/device_ids')
validate_device_ids(reply)
def test_rest_get_devices(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/devices')
validate_devices(reply)
def test_rest_get_device(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
device_uuid = urllib.parse.quote('R1', safe='')
reply = do_rest_get_request('/debug-api/device/{:s}'.format(device_uuid))
validate_device(reply)
# ----- Link -----------------------------------------------------------------------------------------------------------
def test_rest_get_link_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/link_ids')
validate_link_ids(reply)
def test_rest_get_service_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_links(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
reply = do_rest_get_request('/debug-api/links')
validate_links(reply)
def test_rest_get_link(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
link_uuid = urllib.parse.quote('R1/502==R2/501', safe='')
reply = do_rest_get_request('/debug-api/link/{:s}'.format(link_uuid))
validate_link(reply)
# ----- Service --------------------------------------------------------------------------------------------------------
def test_rest_get_service_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/service_ids'.format(context_uuid))
reply = do_rest_get_request('/debug-api/context/{:s}/service_ids'.format(context_uuid))
validate_service_ids(reply)
def test_rest_get_services(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_services(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/services'.format(context_uuid))
reply = do_rest_get_request('/debug-api/context/{:s}/services'.format(context_uuid))
validate_services(reply)
def test_rest_get_service(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_service(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
service_uuid = urllib.parse.quote(SERVICE_R1_R2_UUID, safe='')
reply = do_rest_get_request('/context/{:s}/service/{:s}'.format(context_uuid, service_uuid))
service_uuid = urllib.parse.quote('SVC:R1/200==R2/200', safe='')
reply = do_rest_get_request('/debug-api/context/{:s}/service/{:s}'.format(context_uuid, service_uuid))
validate_service(reply)
def test_rest_get_slice_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/slice_ids'.format(context_uuid))
#validate_slice_ids(reply)
def test_rest_get_slices(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/context/{:s}/slices'.format(context_uuid))
#validate_slices(reply)
# ----- Slice ----------------------------------------------------------------------------------------------------------
def test_rest_get_slice(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_slice_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
slice_uuid = urllib.parse.quote(SLICE_R1_R3_UUID, safe='')
reply = do_rest_get_request('/context/{:s}/slice/{:s}'.format(context_uuid, slice_uuid))
#validate_slice(reply)
def test_rest_get_device_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/device_ids')
validate_device_ids(reply)
def test_rest_get_devices(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/devices')
validate_devices(reply)
reply = do_rest_get_request('/debug-api/context/{:s}/slice_ids'.format(context_uuid))
validate_slice_ids(reply)
def test_rest_get_device(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
device_uuid = urllib.parse.quote(DEVICE_R1_UUID, safe='')
reply = do_rest_get_request('/device/{:s}'.format(device_uuid))
validate_device(reply)
def test_rest_get_slices(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
reply = do_rest_get_request('/debug-api/context/{:s}/slices'.format(context_uuid))
validate_slices(reply)
def test_rest_get_link_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/link_ids')
validate_link_ids(reply)
def test_rest_get_slice(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
slice_uuid = urllib.parse.quote('SLC:R1-R2-R3', safe='')
reply = do_rest_get_request('/debug-api/context/{:s}/slice/{:s}'.format(context_uuid, slice_uuid))
validate_slice(reply)
def test_rest_get_links(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/links')
validate_links(reply)
def test_rest_get_link(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
link_uuid = urllib.parse.quote(LINK_R1_R2_UUID, safe='')
reply = do_rest_get_request('/link/{:s}'.format(link_uuid))
validate_link(reply)
# ----- Connection -----------------------------------------------------------------------------------------------------
def test_rest_get_connection_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_connection_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
service_uuid = urllib.parse.quote(SERVICE_R1_R3_UUID, safe='')
reply = do_rest_get_request('/context/{:s}/service/{:s}/connection_ids'.format(context_uuid, service_uuid))
service_uuid = urllib.parse.quote('SVC:R1/200==R2/200', safe='')
reply = do_rest_get_request('/debug-api/context/{:s}/service/{:s}/connection_ids'.format(context_uuid, service_uuid))
validate_connection_ids(reply)
def test_rest_get_connections(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
def test_rest_get_connections(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
context_uuid = urllib.parse.quote(DEFAULT_CONTEXT_NAME)
service_uuid = urllib.parse.quote(SERVICE_R1_R3_UUID, safe='')
reply = do_rest_get_request('/context/{:s}/service/{:s}/connections'.format(context_uuid, service_uuid))
service_uuid = urllib.parse.quote('SVC:R1/200==R2/200', safe='')
reply = do_rest_get_request('/debug-api/context/{:s}/service/{:s}/connections'.format(context_uuid, service_uuid))
validate_connections(reply)
def test_rest_get_connection(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
connection_uuid = urllib.parse.quote(CONNECTION_R1_R3_UUID, safe='')
reply = do_rest_get_request('/connection/{:s}'.format(connection_uuid))
def test_rest_get_connection(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
connection_uuid = urllib.parse.quote('CON:R1/200==R2/200:1', safe='')
reply = do_rest_get_request('/debug-api/connection/{:s}'.format(connection_uuid))
validate_connection(reply)
def test_rest_get_policyrule_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/policyrule_ids')
#validate_policyrule_ids(reply)
# ----- Policy ---------------------------------------------------------------------------------------------------------
#def test_rest_get_policyrule_ids(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
# reply = do_rest_get_request('/debug-api/policyrule_ids')
# validate_policyrule_ids(reply)
#def test_rest_get_policyrules(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
# reply = do_rest_get_request('/debug-api/policyrules')
# validate_policyrules(reply)
#def test_rest_get_policyrule(nbi_service_rest : RestServer): # pylint: disable=redefined-outer-name, unused-argument
# policyrule_uuid_quoted = urllib.parse.quote(policyrule_uuid, safe='')
# reply = do_rest_get_request('/debug-api/policyrule/{:s}'.format(policyrule_uuid_quoted))
# validate_policyrule(reply)
# ----- Cleanup Environment --------------------------------------------------------------------------------------------
def test_rest_get_policyrules(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_get_request('/policyrules')
#validate_policyrules(reply)
def test_cleanup_environment(context_client : ContextClient) -> None: # pylint: disable=redefined-outer-name
# Verify the scenario has no services/slices
response = context_client.GetContext(ADMIN_CONTEXT_ID)
assert len(response.topology_ids) == 1
assert len(response.service_ids ) == 3
assert len(response.slice_ids ) == 1
def test_rest_get_policyrule(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
policyrule_uuid = urllib.parse.quote(POLICYRULE_UUID, safe='')
reply = do_rest_get_request('/policyrule/{:s}'.format(policyrule_uuid))
#validate_policyrule(reply)
# Load descriptors and validate the base scenario
descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTOR_FILE, context_client=context_client)
descriptor_loader.validate()
descriptor_loader.unload()
validate_empty_scenario(context_client)
......@@ -21,7 +21,7 @@ from common.tools.descriptor.Loader import (
)
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from nbi.service.rest_server import RestServer
from nbi.service.rest_server.RestServer import RestServer
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
do_rest_delete_request, do_rest_get_request, do_rest_post_request,
......
......@@ -15,12 +15,16 @@
import json, logging, requests, uuid
from typing import Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import (
Connection, Device, DeviceList, EndPointId, Link, LinkList, Service, ServiceStatusEnum, ServiceTypeEnum)
ConfigRule, Connection, Device, DeviceList, EndPointId, Link, LinkList, Service, ServiceStatusEnum, ServiceTypeEnum
)
from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
from common.tools.grpc.Tools import grpc_message_list_to_json
from pathcomp.frontend.Config import BACKEND_URL
from .tools.EroPathToHops import eropath_to_hops
from .tools.ComposeConfigRules import (
compose_device_config_rules, compose_l2nm_config_rules, compose_l3nm_config_rules, compose_tapi_config_rules)
compose_device_config_rules, compose_l2nm_config_rules, compose_l3nm_config_rules, compose_tapi_config_rules,
generate_neighbor_endpoint_config_rules
)
from .tools.ComposeRequest import compose_device, compose_link, compose_service
from .tools.ComputeSubServices import (
convert_explicit_path_hops_to_connections, convert_explicit_path_hops_to_plain_connection)
......@@ -227,12 +231,25 @@ class _Algorithm:
continue
orig_config_rules = grpc_orig_service.service_config.config_rules
json_orig_config_rules = grpc_message_list_to_json(orig_config_rules)
for service_path_ero in response['path']:
self.logger.debug('service_path_ero["devices"] = {:s}'.format(str(service_path_ero['devices'])))
_endpoint_to_link_dict = {k:v[0] for k,v in self.endpoint_to_link_dict.items()}
self.logger.debug('self.endpoint_to_link_dict = {:s}'.format(str(_endpoint_to_link_dict)))
path_hops = eropath_to_hops(service_path_ero['devices'], self.endpoint_to_link_dict)
json_generated_config_rules = generate_neighbor_endpoint_config_rules(
json_orig_config_rules, path_hops, self.device_name_mapping, self.endpoint_name_mapping
)
json_extended_config_rules = list()
json_extended_config_rules.extend(json_orig_config_rules)
json_extended_config_rules.extend(json_generated_config_rules)
extended_config_rules = [
ConfigRule(**json_extended_config_rule)
for json_extended_config_rule in json_extended_config_rules
]
self.logger.debug('path_hops = {:s}'.format(str(path_hops)))
try:
_device_dict = {k:v[0] for k,v in self.device_dict.items()}
......@@ -256,7 +273,7 @@ class _Algorithm:
if service_key in grpc_services: continue
grpc_service = self.add_service_to_reply(
reply, context_uuid, service_uuid, service_type, path_hops=path_hops,
config_rules=orig_config_rules)
config_rules=extended_config_rules)
grpc_services[service_key] = grpc_service
for connection in connections:
......