Skip to content
Snippets Groups Projects
Commit 03f748da authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component - GNMI OpenConfig:

- Updated unitary tests
parent 74a75434
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!172Resolve "(CTTC) Extend gNMI-OpenConfig SBI driver"
Showing
with 864 additions and 738 deletions
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest, re
from typing import Dict, List, Tuple
@pytest.fixture(scope='session')
def storage() -> Dict:
yield dict()
##### POPULATE INTERFACE STORAGE #######################################################################################
def populate_interfaces_storage(
storage : Dict, # pylint: disable=redefined-outer-name
resources : List[Tuple[str, Dict]],
) -> None:
interfaces_storage : Dict = storage.setdefault('interfaces', dict())
subinterfaces_storage : Dict = storage.setdefault('interface_subinterfaces', dict())
ipv4_addresses_storage : Dict = storage.setdefault('interface_subinterface_ipv4_addresses', dict())
for resource_key, resource_value in resources:
match = re.match(r'^\/interface\[([^\]]+)\]$', resource_key)
if match is not None:
if_name = match.group(1)
if_storage = interfaces_storage.setdefault(if_name, dict())
if_storage['name' ] = if_name
if_storage['type' ] = resource_value.get('type' )
if_storage['admin-status' ] = resource_value.get('admin-status' )
if_storage['oper-status' ] = resource_value.get('oper-status' )
if_storage['ifindex' ] = resource_value.get('ifindex' )
if_storage['mtu' ] = resource_value.get('mtu' )
if_storage['management' ] = resource_value.get('management' )
if_storage['hardware-port'] = resource_value.get('hardware-port')
if_storage['transceiver' ] = resource_value.get('transceiver' )
continue
match = re.match(r'^\/interface\[([^\]]+)\]\/ethernet$', resource_key)
if match is not None:
if_name = match.group(1)
if_storage = interfaces_storage.setdefault(if_name, dict())
if_storage['port-speed' ] = resource_value.get('port-speed' )
if_storage['negotiated-port-speed'] = resource_value.get('negotiated-port-speed')
if_storage['mac-address' ] = resource_value.get('mac-address' )
if_storage['hw-mac-address' ] = resource_value.get('hw-mac-address' )
continue
match = re.match(r'^\/interface\[([^\]]+)\]\/subinterface\[([^\]]+)\]$', resource_key)
if match is not None:
if_name = match.group(1)
subif_index = int(match.group(2))
subif_storage = subinterfaces_storage.setdefault((if_name, subif_index), dict())
subif_storage['index'] = subif_index
continue
match = re.match(r'^\/interface\[([^\]]+)\]\/subinterface\[([^\]]+)\]\/ipv4\[([^\]]+)\]$', resource_key)
if match is not None:
if_name = match.group(1)
subif_index = int(match.group(2))
ipv4_addr = match.group(3)
ipv4_address_storage = ipv4_addresses_storage.setdefault((if_name, subif_index, ipv4_addr), dict())
ipv4_address_storage['ip' ] = ipv4_addr
ipv4_address_storage['origin'] = resource_value.get('origin')
ipv4_address_storage['prefix'] = resource_value.get('prefix')
continue
##### POPULATE NETWORK INSTANCE STORAGE ################################################################################
def populate_network_instances_storage(
storage : Dict, # pylint: disable=redefined-outer-name
resources : List[Tuple[str, Dict]],
) -> None:
network_instances_storage : Dict = storage.setdefault('network_instances', dict())
network_instance_protocols_storage : Dict = storage.setdefault('network_instance_protocols', dict())
network_instance_protocol_static_storage : Dict = storage.setdefault('network_instance_protocol_static', dict())
network_instance_tables_storage : Dict = storage.setdefault('network_instance_tables', dict())
network_instance_vlans_storage : Dict = storage.setdefault('network_instance_vlans', dict())
for resource_key, resource_value in resources:
match = re.match(r'^\/network\_instance\[([^\]]+)\]$', resource_key)
if match is not None:
name = match.group(1)
ni_storage = network_instances_storage.setdefault(name, dict())
ni_storage['name'] = name
ni_storage['type'] = resource_value.get('type')
continue
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/protocol\[([^\]]+)\]$', resource_key)
if match is not None:
name = match.group(1)
protocol = match.group(2)
ni_p_storage = network_instance_protocols_storage.setdefault((name, protocol), dict())
ni_p_storage['id' ] = protocol
ni_p_storage['name'] = protocol
continue
pattern = r'^\/network\_instance\[([^\]]+)\]\/protocol\[([^\]]+)\]\/static\_routes\[([^\]]+)\]$'
match = re.match(pattern, resource_key)
if match is not None:
name = match.group(1)
protocol = match.group(2)
prefix = match.group(3)
ni_p_s_storage = network_instance_protocol_static_storage.setdefault((name, protocol, prefix), dict())
ni_p_s_storage['prefix' ] = prefix
ni_p_s_storage['next_hops'] = sorted(resource_value.get('next_hops'))
continue
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/table\[([^\,]+)\,([^\]]+)\]$', resource_key)
if match is not None:
name = match.group(1)
protocol = match.group(2)
address_family = match.group(3)
ni_t_storage = network_instance_tables_storage.setdefault((name, protocol, address_family), dict())
ni_t_storage['protocol' ] = protocol
ni_t_storage['address_family'] = address_family
continue
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is not None:
name = match.group(1)
vlan_id = int(match.group(2))
ni_v_storage = network_instance_vlans_storage.setdefault((name, vlan_id), dict())
ni_v_storage['vlan_id'] = vlan_id
ni_v_storage['name' ] = resource_value.get('name')
ni_v_storage['members'] = sorted(resource_value.get('members'))
continue
##### GET EXPECTED INTERFACE CONFIG ####################################################################################
INTERFACE_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/interface[{if_name:s}]', [
'name', 'type', 'admin-status', 'oper-status', 'management', 'mtu', 'ifindex', 'hardware-port', 'transceiver'
]),
('/interface[{if_name:s}]/ethernet', [
'port-speed', 'negotiated-port-speed', 'mac-address', 'hw-mac-address'
]),
]
INTERFACE_SUBINTERFACE_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/interface[{if_name:s}]/subinterface[{subif_index:d}]', ['index']),
]
INTERFACE_SUBINTERFACE_IPV4_ADDRESS_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/interface[{if_name:s}]/subinterface[{subif_index:d}]/ipv4[{ipv4_addr:s}]', ['ip', 'origin', 'prefix']),
]
def get_expected_interface_config(
storage : Dict, # pylint: disable=redefined-outer-name
) -> List[Tuple[str, Dict]]:
interfaces_storage : Dict = storage.setdefault('interfaces', dict())
subinterfaces_storage : Dict = storage.setdefault('interface_subinterfaces', dict())
ipv4_addresses_storage : Dict = storage.setdefault('interface_subinterface_ipv4_addresses', dict())
expected_interface_config = list()
for if_name, if_storage in interfaces_storage.items():
for resource_key_template, resource_key_field_names in INTERFACE_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(if_name=if_name)
resource_value = {
field_name : if_storage[field_name]
for field_name in resource_key_field_names
if field_name in if_storage and if_storage[field_name] is not None
}
expected_interface_config.append((resource_key, resource_value))
for (if_name, subif_index), subif_storage in subinterfaces_storage.items():
for resource_key_template, resource_key_field_names in INTERFACE_SUBINTERFACE_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(if_name=if_name, subif_index=subif_index)
resource_value = {
field_name : subif_storage[field_name]
for field_name in resource_key_field_names
if field_name in subif_storage and subif_storage[field_name] is not None
}
expected_interface_config.append((resource_key, resource_value))
for (if_name, subif_index, ipv4_addr), ipv4_storage in ipv4_addresses_storage.items():
for resource_key_template, resource_key_field_names in INTERFACE_SUBINTERFACE_IPV4_ADDRESS_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(if_name=if_name, subif_index=subif_index, ipv4_addr=ipv4_addr)
resource_value = {
field_name : ipv4_storage[field_name]
for field_name in resource_key_field_names
if field_name in ipv4_storage and ipv4_storage[field_name] is not None
}
expected_interface_config.append((resource_key, resource_value))
return expected_interface_config
##### GET EXPECTED NETWORK INSTANCE CONFIG #############################################################################
NETWORK_INSTANCE_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/network_instance[{ni_name:s}]', ['name', 'type']),
]
NETWORK_INSTANCE_PROTOCOL_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/network_instance[{ni_name:s}]/protocol[{protocol:s}]', ['id', 'name']),
]
NETWORK_INSTANCE_PROTOCOL_STATIC_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/network_instance[{ni_name:s}]/protocol[{protocol:s}]/static_routes[{prefix:s}]', ['prefix', 'next_hops']),
]
NETWORK_INSTANCE_TABLE_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/network_instance[{ni_name:s}]/table[{protocol:s},{address_family:s}]', ['protocol', 'address_family']),
]
NETWORK_INSTANCE_VLAN_CONFIG_STRUCTURE : List[Tuple[str, List[str]]] = [
('/network_instance[{ni_name:s}]/vlan[{vlan_id:d}]', ['vlan_id', 'name', 'members']),
]
def get_expected_network_instance_config(
storage : Dict, # pylint: disable=redefined-outer-name
) -> List[Tuple[str, Dict]]:
network_instances_storage : Dict = storage.setdefault('network_instances', dict())
network_instance_protocols_storage : Dict = storage.setdefault('network_instance_protocols', dict())
network_instance_protocol_static_storage : Dict = storage.setdefault('network_instance_protocol_static', dict())
network_instance_tables_storage : Dict = storage.setdefault('network_instance_tables', dict())
network_instance_vlans_storage : Dict = storage.setdefault('network_instance_vlans', dict())
expected_network_instance_config = list()
for ni_name, ni_storage in network_instances_storage.items():
for resource_key_template, resource_key_field_names in NETWORK_INSTANCE_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(ni_name=ni_name)
resource_value = {
field_name : ni_storage[field_name]
for field_name in resource_key_field_names
if field_name in ni_storage and ni_storage[field_name] is not None
}
expected_network_instance_config.append((resource_key, resource_value))
for (ni_name, protocol), ni_p_storage in network_instance_protocols_storage.items():
for resource_key_template, resource_key_field_names in NETWORK_INSTANCE_PROTOCOL_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(ni_name=ni_name, protocol=protocol)
resource_value = {
field_name : ni_p_storage[field_name]
for field_name in resource_key_field_names
if field_name in ni_p_storage and ni_p_storage[field_name] is not None
}
expected_network_instance_config.append((resource_key, resource_value))
for (ni_name, protocol, prefix), ni_p_s_storage in network_instance_protocol_static_storage.items():
for resource_key_template, resource_key_field_names in NETWORK_INSTANCE_PROTOCOL_STATIC_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(ni_name=ni_name, protocol=protocol, prefix=prefix)
resource_value = {
field_name : ni_p_s_storage[field_name]
for field_name in resource_key_field_names
if field_name in ni_p_s_storage and ni_p_s_storage[field_name] is not None
}
expected_network_instance_config.append((resource_key, resource_value))
for (ni_name, protocol, address_family), ni_t_storage in network_instance_tables_storage.items():
for resource_key_template, resource_key_field_names in NETWORK_INSTANCE_TABLE_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(
ni_name=ni_name, protocol=protocol, address_family=address_family
)
resource_value = {
field_name : ni_t_storage[field_name]
for field_name in resource_key_field_names
if field_name in ni_t_storage and ni_t_storage[field_name] is not None
}
expected_network_instance_config.append((resource_key, resource_value))
for (ni_name, vlan_id), ni_v_storage in network_instance_vlans_storage.items():
for resource_key_template, resource_key_field_names in NETWORK_INSTANCE_VLAN_CONFIG_STRUCTURE:
resource_key = resource_key_template.format(ni_name=ni_name, vlan_id=vlan_id)
resource_value = {
field_name : ni_v_storage[field_name]
for field_name in resource_key_field_names
if field_name in ni_v_storage and ni_v_storage[field_name] is not None
}
expected_network_instance_config.append((resource_key, resource_value))
return expected_network_instance_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .StorageEndpoints import StorageEndpoints
from .StorageInterface import StorageInterface
from .StorageNetworkInstance import StorageNetworkInstance
class Storage:
def __init__(self) -> None:
self.endpoints = StorageEndpoints()
self.interfaces = StorageInterface()
self.network_instances = StorageNetworkInstance()
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Dict, List, Tuple
from .Tools import compose_resources
RE_RESKEY_ENDPOINT = re.compile(r'^\/endpoints\/endpoint\[([^\]]+)\]$')
ENDPOINT_PACKET_SAMPLE_TYPES : Dict[int, str] = {
101: '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/out-pkts',
102: '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/in-pkts',
201: '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/out-octets',
202: '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/in-octets',
}
class Endpoints:
STRUCT : List[Tuple[str, List[str]]] = [
('/endpoints/endpoint[{:s}]', ['uuid', 'type', 'sample_types']),
]
def __init__(self) -> None:
self._items : Dict[str, Dict] = dict()
def add(self, ep_uuid : str, resource_value : Dict) -> None:
item = self._items.setdefault(ep_uuid, dict())
item['uuid'] = ep_uuid
for _, field_names in Endpoints.STRUCT:
field_names = set(field_names)
item.update({k:v for k,v in resource_value if k in field_names})
item['sample_types'] = {
sample_type_id : sample_type_path.format(ep_uuid)
for sample_type_id, sample_type_path in ENDPOINT_PACKET_SAMPLE_TYPES.items()
}
def remove(self, ep_uuid : str) -> None:
self._items.pop(ep_uuid, None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Endpoints.STRUCT)
class StorageEndpoints:
def __init__(self) -> None:
self.endpoints = Endpoints()
def populate(self, resources : List[Tuple[str, Dict]]) -> None:
for resource_key, resource_value in resources:
match = RE_RESKEY_ENDPOINT.match(resource_key)
if match is not None:
self.endpoints.add(match.group(1), resource_value)
continue
MSG = 'Unhandled Resource Key: {:s} => {:s}'
raise Exception(MSG.format(str(resource_key), str(resource_value)))
def get_expected_config(self) -> List[Tuple[str, Dict]]:
expected_config = list()
expected_config.extend(self.endpoints.compose_resources())
return expected_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Dict, List, Tuple
from .Tools import compose_resources
PREFIX = r'^\/interface\[([^\]]+)\]'
RE_RESKEY_INTERFACE = re.compile(PREFIX + r'$')
RE_RESKEY_ETHERNET = re.compile(PREFIX + r'\/ethernet$')
RE_RESKEY_SUBINTERFACE = re.compile(PREFIX + r'\/subinterface\[([^\]]+)\]$')
RE_RESKEY_IPV4_ADDRESS = re.compile(PREFIX + r'\/subinterface\[([^\]]+)\]\/ipv4\[([^\]]+)\]$')
class Interfaces:
STRUCT : List[Tuple[str, List[str]]] = [
('/interface[{:s}]', ['name', 'type', 'admin-status', 'oper-status', 'management', 'mtu', 'ifindex',
'hardware-port', 'transceiver']),
('/interface[{:s}]/ethernet', ['port-speed', 'negotiated-port-speed', 'mac-address', 'hw-mac-address']),
]
def __init__(self) -> None:
self._items : Dict[str, Dict] = dict()
def add(self, if_name : str, resource_value : Dict) -> None:
item = self._items.setdefault(if_name, dict())
item['name'] = if_name
for _, field_names in Interfaces.STRUCT:
field_names = set(field_names)
item.update({k:v for k,v in resource_value if k in field_names})
def remove(self, if_name : str) -> None:
self._items.pop(if_name, None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Interfaces.STRUCT)
class SubInterfaces:
STRUCT : List[Tuple[str, List[str]]] = [
('/interface[{:s}]/subinterface[{:d}]', ['index']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, int], Dict] = dict()
def add(self, if_name : str, subif_index : int) -> None:
item = self._items.setdefault((if_name, subif_index), dict())
item['index'] = subif_index
def remove(self, if_name : str, subif_index : int) -> None:
self._items.pop((if_name, subif_index), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, SubInterfaces.STRUCT)
class IPv4Addresses:
STRUCT : List[Tuple[str, List[str]]] = [
('/interface[{:s}]/subinterface[{:d}]/ipv4[{:s}]', ['ip', 'origin', 'prefix']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, int, str], Dict] = dict()
def add(self, if_name : str, subif_index : int, ipv4_address : str, resource_value : Dict) -> None:
item = self._items.setdefault((if_name, subif_index, ipv4_address), dict())
item['ip' ] = ipv4_address
item['origin'] = resource_value.get('origin')
item['prefix'] = resource_value.get('prefix')
def remove(self, if_name : str, subif_index : int, ipv4_address : str) -> None:
self._items.pop((if_name, subif_index, ipv4_address), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, IPv4Addresses.STRUCT)
class StorageInterface:
def __init__(self) -> None:
self.interfaces = Interfaces()
self.subinterfaces = SubInterfaces()
self.ipv4_addresses = IPv4Addresses()
def populate(self, resources : List[Tuple[str, Dict]]) -> None:
for resource_key, resource_value in resources:
match = RE_RESKEY_INTERFACE.match(resource_key)
if match is not None:
self.interfaces.add(match.group(1), resource_value)
continue
match = RE_RESKEY_ETHERNET.match(resource_key)
if match is not None:
self.interfaces.add(match.group(1), resource_value)
continue
match = RE_RESKEY_SUBINTERFACE.match(resource_key)
if match is not None:
self.subinterfaces.add(match.group(1), int(match.group(2)))
continue
match = RE_RESKEY_IPV4_ADDRESS.match(resource_key)
if match is not None:
self.ipv4_addresses.add(match.group(1), int(match.group(2)), match.group(3), resource_value)
continue
MSG = 'Unhandled Resource Key: {:s} => {:s}'
raise Exception(MSG.format(str(resource_key), str(resource_value)))
def get_expected_config(self) -> List[Tuple[str, Dict]]:
expected_config = list()
expected_config.extend(self.interfaces.compose_resources())
expected_config.extend(self.subinterfaces.compose_resources())
expected_config.extend(self.ipv4_addresses.compose_resources())
return expected_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Dict, List, Tuple
from .Tools import compose_resources
PREFIX = r'^\/network\_instance\[([^\]]+)\]'
RE_RESKEY_NET_INST = re.compile(PREFIX + r'$')
RE_RESKEY_INTERFACE = re.compile(PREFIX + r'\/interface\[([^\]]+)\]$')
RE_RESKEY_PROTOCOL = re.compile(PREFIX + r'\/protocol\[([^\]]+)\]$')
RE_RESKEY_PROTO_STATIC = re.compile(PREFIX + r'\/protocol\[([^\]]+)\]\/static\_routes\[([^\]]+)\]$')
RE_RESKEY_TABLE = re.compile(PREFIX + r'\/table\[([^\,]+)\,([^\]]+)\]$')
RE_RESKEY_VLAN = re.compile(PREFIX + r'\/vlan\[([^\]]+)\]$')
class NetworkInstances:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]', ['name', 'type']),
]
def __init__(self) -> None:
self._items : Dict[str, Dict] = dict()
def add(self, ni_name : str, resource_value : Dict) -> None:
item = self._items.setdefault(ni_name, dict())
item['name'] = ni_name
item['type'] = resource_value.get('type')
def remove(self, ni_name : str) -> None:
self._items.pop(ni_name, None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, NetworkInstances.STRUCT)
class Interfaces:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]/interface[{:s}]', ['ni_name', 'if_name']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, str], Dict] = dict()
def add(self, ni_name : str, if_name : str) -> None:
item = self._items.setdefault((ni_name, if_name), dict())
item['ni_name'] = ni_name
item['if_name'] = if_name
def remove(self, ni_name : str, if_name : str) -> None:
self._items.pop((ni_name, if_name), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Interfaces.STRUCT)
class Protocols:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]/protocol[{:s}]', ['id', 'name']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, str], Dict] = dict()
def add(self, ni_name : str, protocol : str) -> None:
item = self._items.setdefault((ni_name, protocol), dict())
item['id' ] = protocol
item['name'] = protocol
def remove(self, ni_name : str, protocol : str) -> None:
self._items.pop((ni_name, protocol), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Protocols.STRUCT)
class StaticRoutes:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]/protocol[{:s}]/static_routes[{:s}]', ['prefix', 'next_hops']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, str, str], Dict] = dict()
def add(self, ni_name : str, protocol : str, prefix : str, resource_value : Dict) -> None:
item = self._items.setdefault((ni_name, protocol, prefix), dict())
item['prefix' ] = prefix
item['next_hops'] = sorted(resource_value.get('next_hops'))
def remove(self, ni_name : str, protocol : str, prefix : str) -> None:
self._items.pop((ni_name, protocol, prefix), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, StaticRoutes.STRUCT)
class Tables:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]/table[{:s},{:s}]', ['protocol', 'address_family']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, str, str], Dict] = dict()
def add(self, ni_name : str, protocol : str, address_family : str) -> None:
item = self._items.setdefault((ni_name, protocol, address_family), dict())
item['protocol' ] = protocol
item['address_family'] = address_family
def remove(self, ni_name : str, protocol : str, address_family : str) -> None:
self._items.pop((ni_name, protocol, address_family), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Tables.STRUCT)
class Vlans:
STRUCT : List[Tuple[str, List[str]]] = [
('/network_instance[{:s}]/vlan[{:d}]', ['vlan_id', 'name', 'members']),
]
def __init__(self) -> None:
self._items : Dict[Tuple[str, int], Dict] = dict()
def add(self, ni_name : str, vlan_id : int, resource_value : Dict) -> None:
item = self._items.setdefault((ni_name, vlan_id), dict())
item['vlan_id'] = vlan_id
item['name' ] = resource_value.get('name')
item['members'] = sorted(resource_value.get('members'))
def remove(self, ni_name : str, vlan_id : int) -> None:
self._items.pop((ni_name, vlan_id), None)
def compose_resources(self) -> List[Dict]:
return compose_resources(self._items, Vlans.STRUCT)
class StorageNetworkInstance:
def __init__(self) -> None:
self.network_instances = NetworkInstances()
self.interfaces = Interfaces()
self.protocols = Protocols()
self.protocol_static = StaticRoutes()
self.tables = Tables()
self.vlans = Vlans()
def populate(self, resources : List[Tuple[str, Dict]]) -> None:
for resource_key, resource_value in resources:
match = RE_RESKEY_NET_INST.match(resource_key)
if match is not None:
self.network_instances.add(match.group(1), resource_value)
continue
match = RE_RESKEY_INTERFACE.match(resource_key)
if match is not None:
self.interfaces.add(match.group(1), match.group(2))
continue
match = RE_RESKEY_PROTOCOL.match(resource_key)
if match is not None:
self.protocols.add(match.group(1), match.group(2))
continue
match = RE_RESKEY_PROTO_STATIC.match(resource_key)
if match is not None:
self.protocol_static.add(match.group(1), match.group(2), match.group(3), resource_value)
continue
match = RE_RESKEY_TABLE.match(resource_key)
if match is not None:
self.tables.add(match.group(1), match.group(2), match.group(3))
continue
match = RE_RESKEY_VLAN.match(resource_key)
if match is not None:
self.vlans.add(match.group(1), int(match.group(2)), resource_value)
continue
MSG = 'Unhandled Resource Key: {:s} => {:s}'
raise Exception(MSG.format(str(resource_key), str(resource_value)))
def get_expected_config(self) -> List[Tuple[str, Dict]]:
expected_config = list()
expected_config.extend(self.network_instances.compose_resources())
expected_config.extend(self.interfaces.compose_resources())
expected_config.extend(self.protocols.compose_resources())
expected_config.extend(self.protocol_static.compose_resources())
expected_config.extend(self.tables.compose_resources())
expected_config.extend(self.vlans.compose_resources())
return expected_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Tuple
def compose_resources(
storage : Dict[Tuple, Dict], config_struct : List[Tuple[str, List[str]]]
) -> List[Dict]:
expected_config = list()
for resource_key_fields, resource_value_data in storage.items():
for resource_key_template, resource_key_field_names in config_struct:
resource_key = resource_key_template.format(*resource_key_fields)
resource_value = {
field_name : resource_value_data[field_name]
for field_name in resource_key_field_names
if field_name in resource_value_data and resource_value_data[field_name] is not None
}
expected_config.append((resource_key, resource_value))
return expected_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -12,22 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import deepdiff, logging, os, pytest, re, time
from typing import Dict
import os
os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
# pylint: disable=wrong-import-position
import itertools, logging, pytest, time
from typing import Dict
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ROUTING_POLICIES, RESOURCE_SERVICES
)
from device.service.drivers.gnmi_openconfig.GnmiOpenConfigDriver import GnmiOpenConfigDriver
from .request_composers import interface, network_instance, network_instance_interface, network_instance_static_route
from .storage import ( # pylint: disable=unused-import
storage, # be careful, order of symbols is important here!; storage should be the first one
get_expected_interface_config, get_expected_network_instance_config, populate_interfaces_storage,
populate_network_instances_storage
from .tools.check_config import check_config_endpoints, check_config_interfaces, check_config_network_instances
from .tools.check_updates import check_updates
from .tools.expected_config_composers import (
compose_expected_config__interface, compose_expected_config__network_instance
)
from .tools.request_composers import (
interface, network_instance, network_instance_interface, network_instance_static_route
)
from .storage.Storage import Storage
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
......@@ -56,19 +59,40 @@ def driver() -> GnmiOpenConfigDriver:
_driver.Disconnect()
##### STORAGE FIXTURE ##################################################################################################
@pytest.fixture(scope='session')
def storage() -> Dict:
yield Storage()
##### NETWORK INSTANCE DETAILS #########################################################################################
NI_NAME = 'test-l3-svc'
NI_TYPE = 'L3VRF'
NI_INTERFACES = [
# interface_name, subinterface_index, ipv4 address, ipv4 prefix, enabled
('Ethernet1', 0, '192.168.1.1', 24, True),
('Ethernet10', 0, '192.168.10.1', 24, True),
]
NI_STATIC_ROUTES = [
# prefix, gateway, metric
('172.0.0.0/24', '172.16.0.2', 1),
('172.2.0.0/24', '172.16.0.3', 1),
NETWORK_INSTANCES = [
{
'name': 'test-l3-svc',
'type': 'L3VRF',
'interfaces': [
{'name': 'Ethernet1', 'subif_index': 0, 'ipv4_addr': '192.168.1.1', 'ipv4_prefix': 24, 'enabled': True},
{'name': 'Ethernet10', 'subif_index': 0, 'ipv4_addr': '192.168.10.1', 'ipv4_prefix': 24, 'enabled': True},
],
'static_routes': [
{'prefix': '172.0.0.0/24', 'gateway': '172.16.0.2', 'metric': 1},
{'prefix': '172.2.0.0/24', 'gateway': '172.16.0.3', 'metric': 1},
]
},
{
'name': 'test-l2-svc',
'type': 'L2VSI',
'interfaces': [
{'name': 'Ethernet2', 'subif_index': 0, 'ipv4_addr': '192.168.1.1', 'ipv4_prefix': 24, 'enabled': True},
{'name': 'Ethernet4', 'subif_index': 0, 'ipv4_addr': '192.168.10.1', 'ipv4_prefix': 24, 'enabled': True},
],
'static_routes': [
{'prefix': '172.0.0.0/24', 'gateway': '172.16.0.2', 'metric': 1},
{'prefix': '172.2.0.0/24', 'gateway': '172.16.0.3', 'metric': 1},
]
}
]
......@@ -76,523 +100,249 @@ NI_STATIC_ROUTES = [
def test_get_endpoints(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_ENDPOINTS]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
expected_getconfig = [
('/endpoints/endpoint[ethernet1]', {'uuid': 'ethernet1', 'type': '-', 'sample_types': {
202: '/openconfig-interfaces:interfaces/interface[name=ethernet1]/state/counters/in-octets',
201: '/openconfig-interfaces:interfaces/interface[name=ethernet1]/state/counters/out-octets',
102: '/openconfig-interfaces:interfaces/interface[name=ethernet1]/state/counters/in-pkts',
101: '/openconfig-interfaces:interfaces/interface[name=ethernet1]/state/counters/out-pkts'
}}),
('/endpoints/endpoint[ethernet10]', {'uuid': 'ethernet10', 'type': '-', 'sample_types': {
202: '/openconfig-interfaces:interfaces/interface[name=ethernet10]/state/counters/in-octets',
201: '/openconfig-interfaces:interfaces/interface[name=ethernet10]/state/counters/out-octets',
102: '/openconfig-interfaces:interfaces/interface[name=ethernet10]/state/counters/in-pkts',
101: '/openconfig-interfaces:interfaces/interface[name=ethernet10]/state/counters/out-pkts'
}})
]
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
results_getconfig = check_config_endpoints(driver, storage)
storage.endpoints.populate(results_getconfig)
check_config_endpoints(driver, storage)
def test_get_interfaces(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
populate_interfaces_storage(storage, results_getconfig)
expected_getconfig = get_expected_interface_config(storage)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
results_getconfig = check_config_interfaces(driver, storage)
storage.interfaces.populate(results_getconfig)
check_config_interfaces(driver, storage)
def test_get_network_instances(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
populate_network_instances_storage(storage, results_getconfig)
expected_getconfig = get_expected_network_instance_config(storage)
for resource_key, resource_value in results_getconfig:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is None: continue
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
results_getconfig = check_config_network_instances(driver, storage)
storage.network_instances.populate(results_getconfig)
check_config_network_instances(driver, storage)
def test_set_network_instances(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_set = [
network_instance(NI_NAME, NI_TYPE),
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
resources_to_set = list()
ni_names = list()
for ni in NETWORK_INSTANCES:
ni_name = ni['name']
ni_type = ni['type']
resources_to_set.append(network_instance(ni_name, ni_type))
ni_names.append(ni_name)
storage.network_instances.network_instances.add(ni_name, {'type': ni_type})
LOGGER.info('resources_to_set = {:s}'.format(str(resources_to_set)))
results_setconfig = driver.SetConfig(resources_to_set)
LOGGER.info('results_setconfig = {:s}'.format(str(results_setconfig)))
check_updates(results_setconfig, '/network_instance[{:s}]', ni_names)
network_instances = sorted([NI_NAME])
results = set(results_setconfig)
assert len(results) == len(network_instances)
for ni_name in network_instances:
assert ('/network_instance[{:s}]'.format(ni_name), True) in results
expected_getconfig = get_expected_network_instance_config(storage)
expected_getconfig.extend([
('/network_instance[{:s}]'.format(NI_NAME), {
'name': NI_NAME, 'type': NI_TYPE
}),
('/network_instance[{:s}]/protocol[DIRECTLY_CONNECTED]'.format(NI_NAME), {
'id': 'DIRECTLY_CONNECTED', 'name': 'DIRECTLY_CONNECTED'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV4]'.format(NI_NAME), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV4'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV6]'.format(NI_NAME), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV6'
})
])
#for resource_key, resource_value in expected_getconfig:
# if resource_key == '/network_instance[default]/vlan[1]':
# resource_value['members'] = list()
LOGGER.info('expected_getconfig = {:s}'.format(str(sorted(expected_getconfig))))
permitted_retries = 5
while permitted_retries > 0:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
for resource_key, resource_value in results_getconfig:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is None: continue
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(0.5)
permitted_retries -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_set_interfaces(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_set = [
interface(if_name, sif_index, ipv4_addr, ipv4_prefix, enabled)
for if_name, sif_index, ipv4_addr, ipv4_prefix, enabled in NI_INTERFACES
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
resources_to_set = list()
if_names = list()
for ni in NETWORK_INSTANCES:
ni_name = ni['name']
for ni_if in ni.get('interfaces', list()):
if_name = ni_if['if_name']
subif_index = ni_if['sif_index']
ipv4_address = ni_if['ipv4_addr']
ipv4_prefix = ni_if['ipv4_prefix']
enabled = ni_if['enabled']
resources_to_set.append(interface(
if_name, subif_index, ipv4_address, ipv4_prefix, enabled
))
if_names.append(ni_name)
storage.interfaces.ipv4_addresses.add(if_name, subif_index, ipv4_address, {
'origin' : 'STATIC', 'prefix': ipv4_prefix
})
LOGGER.info('resources_to_set = {:s}'.format(str(resources_to_set)))
results_setconfig = driver.SetConfig(resources_to_set)
LOGGER.info('results_setconfig = {:s}'.format(str(results_setconfig)))
check_updates(results_setconfig, '/interface[{:s}]', if_names)
interfaces = sorted([
if_name
for if_name, _, _, _, _ in NI_INTERFACES
])
results = set(results_setconfig)
assert len(results) == len(interfaces)
for if_name in interfaces:
assert ('/interface[{:s}]'.format(if_name), True) in results
expected_getconfig = get_expected_interface_config(storage)
expected_getconfig.extend([
('/interface[{:s}]/subinterface[{:d}]/ipv4[{:s}]'.format(if_name, sif_index, ipv4_addr), {
'ip': ipv4_addr, 'origin': 'STATIC', 'prefix': ipv4_prefix
})
for if_name, sif_index, ipv4_addr, ipv4_prefix, _ in NI_INTERFACES
])
permitted_retries = 5
while permitted_retries > 0:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(0.5)
permitted_retries -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_add_interfaces_to_network_instance(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_set = [
network_instance_interface(NI_NAME, if_name, sif_index)
for if_name, sif_index, _, _, _ in NI_INTERFACES
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
resources_to_set = list()
ni_if_names = list()
for ni in NETWORK_INSTANCES:
ni_name = ni['name']
for ni_if in ni.get('interfaces', list()):
if_name = ni_if['if_name']
subif_index = ni_if['sif_index']
resources_to_set.append(network_instance_interface(ni_name, if_name, subif_index))
ni_if_names.append((ni_name, '{:s}.{:d}'.format(if_name, subif_index)))
storage.network_instances.interfaces.add(ni_name, if_name, subif_index)
LOGGER.info('resources_to_set = {:s}'.format(str(resources_to_set)))
results_setconfig = driver.SetConfig(resources_to_set)
LOGGER.info('results_setconfig = {:s}'.format(str(results_setconfig)))
check_updates(results_setconfig, '/network_instance[{:s}]/interface[{:s}]', ni_if_names)
interfaces = sorted([
'{:s}.{:d}'.format(if_name, sif_index)
for if_name, sif_index, _, _, _ in NI_INTERFACES
])
results = set(results_setconfig)
assert len(results) == len(interfaces)
for if_name in interfaces:
assert ('/network_instance[{:s}]/interface[{:s}]'.format(NI_NAME, if_name), True) in results
expected_getconfig = get_expected_interface_config(storage)
expected_getconfig.extend([
('/interface[{:s}]/subinterface[{:d}]/ipv4[{:s}]'.format(if_name, sif_index, ipv4_addr), {
'ip': ipv4_addr, 'origin': 'STATIC', 'prefix': ipv4_prefix
})
for if_name, sif_index, ipv4_addr, ipv4_prefix, _ in NI_INTERFACES
])
LOGGER.info('expected_getconfig = {:s}'.format(str(sorted(expected_getconfig))))
permitted_retries = 5
while permitted_retries > 0:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(0.5)
permitted_retries -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
expected_getconfig = get_expected_network_instance_config(storage)
expected_getconfig.extend([
('/network_instance[{:s}]'.format(NI_NAME), {
'name': NI_NAME, 'type': NI_TYPE
}),
('/network_instance[{:s}]/protocol[DIRECTLY_CONNECTED]'.format(NI_NAME), {
'id': 'DIRECTLY_CONNECTED', 'name': 'DIRECTLY_CONNECTED'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV4]'.format(NI_NAME), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV4'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV6]'.format(NI_NAME), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV6'
})
])
LOGGER.info('expected_getconfig = {:s}'.format(str(sorted(expected_getconfig))))
permitted_retries = 5
while permitted_retries > 0:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
for resource_key, resource_value in results_getconfig:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is None: continue
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(0.5)
permitted_retries -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_set_network_instance_static_routes(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_set = [
network_instance_static_route(NI_NAME, prefix, gateway, metric=metric)
for prefix, gateway, metric in NI_STATIC_ROUTES
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
# TODO: update structure
resources_to_set = list(itertools.chain(*[
[
network_instance_static_route(ni['name'], ni_sr['prefix'], ni_sr['gateway'], metric=ni_sr['metric'])
for ni_sr in ni.get('static_routes', list())
]
for ni in NETWORK_INSTANCES
]))
LOGGER.info('resources_to_set = {:s}'.format(str(resources_to_set)))
results_setconfig = driver.SetConfig(resources_to_set)
LOGGER.info('results_setconfig = {:s}'.format(str(results_setconfig)))
check_updates(results_setconfig, '/network_instance[{:s}]/static_route[{:s}]', list(itertools.chain(*[
[(ni['name'], ni_sr['prefix']) for ni_sr in ni.get('static_routes', list())]
for ni in NETWORK_INSTANCES
])))
prefixes = sorted([
prefix
for prefix, _, _ in NI_STATIC_ROUTES
])
results = set(results_setconfig)
assert len(results) == len(prefixes)
for prefix in prefixes:
assert ('/network_instance[{:s}]/static_route[{:s}]'.format(NI_NAME, prefix), True) in results
expected_getconfig = get_expected_network_instance_config(storage)
expected_getconfig.extend([
('/network_instance[{:s}]/static_route[{:s}]'.format(NI_NAME, prefix), {
'name': NI_NAME, 'prefix': prefix, 'next_hop': gateway, 'next_hop_index': 0, 'metric': metric
})
for prefix, gateway, metric in NI_STATIC_ROUTES
])
permitted_retries = 5
while permitted_retries > 0:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
for resource_key, resource_value in results_getconfig:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is None: continue
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(0.5)
permitted_retries -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_del_network_instance_static_routes(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_delete = [
network_instance_static_route(NI_NAME, '172.0.0.0/24', '172.16.0.2'),
network_instance_static_route(NI_NAME, '172.2.0.0/24', '172.16.0.3'),
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
# TODO: update structure
resources_to_delete = list(itertools.chain(*[
[
network_instance_static_route(ni['name'], ni_sr['prefix'], ni_sr['gateway'], metric=ni_sr['metric'])
for ni_sr in ni.get('static_routes', list())
]
for ni in NETWORK_INSTANCES
]))
LOGGER.info('resources_to_delete = {:s}'.format(str(resources_to_delete)))
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
LOGGER.info('results_deleteconfig = {:s}'.format(str(results_deleteconfig)))
check_updates(results_deleteconfig, '/network_instance[{:s}]/static_route[{:s}]', list(itertools.chain(*[
[(ni['name'], ni_sr['prefix']) for ni_sr in ni.get('static_routes', list())]
for ni in NETWORK_INSTANCES
])))
#interfaces = sorted(['Ethernet1', 'Ethernet10'])
#results = set(results_deleteconfig)
#assert len(results) == len(interfaces)
#for if_name in interfaces:
# assert ('/interface[{:s}]'.format(if_name), True) in results
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
#expected_getconfig = get_expected_interface_config(storage)
#diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
#num_diffs = len(diff_data)
#if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
#assert num_diffs == 0
raise Exception()
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_del_interfaces_from_network_instance(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_delete = [
network_instance_interface(NI_NAME, if_name, sif_index)
for if_name, sif_index, _, _, _ in NI_INTERFACES
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
# TODO: update structure
resources_to_delete = list(itertools.chain(*[
[
network_instance_interface(ni['name'], ni_if['if_name'], ni_if['subif_index'])
for ni_if in ni.get('interfaces', list())
]
for ni in NETWORK_INSTANCES
]))
LOGGER.info('resources_to_delete = {:s}'.format(str(resources_to_delete)))
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
LOGGER.info('results_deleteconfig = {:s}'.format(str(results_deleteconfig)))
check_updates(results_deleteconfig, '/network_instance[{:s}]/interface[{:s}]', list(itertools.chain(*[
[
(ni['name'], '{:s}.{:d}'.format(ni_if['if_name'], ni_if['subif_index']))
for ni_if in ni.get('interfaces', list())
]
for ni in NETWORK_INSTANCES
])))
interface_ids = sorted([
'{:s}.{:d}'.format(if_name, sif_index)
for if_name, sif_index, _, _, _ in NI_INTERFACES
])
results = set(results_deleteconfig)
assert len(results) == len(interface_ids)
for interface_id in interface_ids:
assert ('/network_instance[{:s}]/interface[{:s}]'.format(NI_NAME, interface_id), True) in results
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
expected_getconfig = get_expected_interface_config(storage)
expected_getconfig.extend([
('/interface[Ethernet1]/subinterface[0]/ipv4[192.168.1.1]', {
'ip': '192.168.1.1', 'origin': 'STATIC', 'prefix': 24
}),
('/interface[Ethernet10]/subinterface[0]/ipv4[192.168.10.1]', {
'ip': '192.168.10.1', 'origin': 'STATIC', 'prefix': 24
})
])
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
expected_getconfig = get_expected_network_instance_config(storage)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
raise Exception()
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_del_interfaces(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
resources_to_delete = [
interface('Ethernet1', 0, '192.168.1.1', 24, True),
interface('Ethernet10', 0, '192.168.10.1', 24, True),
]
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
# TODO: update structure
resources_to_delete = list(itertools.chain(*[
[
interface(ni_if['if_name'], ni_if['sif_index'], ni_if['ipv4_addr'], ni_if['ipv4_prefix'], ni_if['enabled'])
for ni_if in ni.get('interfaces', list())
]
for ni in NETWORK_INSTANCES
]))
LOGGER.info('resources_to_delete = {:s}'.format(str(resources_to_delete)))
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
LOGGER.info('results_deleteconfig = {:s}'.format(str(results_deleteconfig)))
check_updates(results_deleteconfig, '/interface[{:s}]', list(itertools.chain(*[
[ni_if['name'] for ni_if in ni.get('interfaces', list())]
for ni in NETWORK_INSTANCES
])))
interfaces = sorted(['Ethernet1', 'Ethernet10'])
results = set(results_deleteconfig)
assert len(results) == len(interfaces)
for if_name in interfaces:
assert ('/interface[{:s}]'.format(if_name), True) in results
resources_to_get = [RESOURCE_INTERFACES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
expected_getconfig = get_expected_interface_config(storage)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
def test_del_network_instances(
driver : GnmiOpenConfigDriver, # pylint: disable=redefined-outer-name
storage : Dict, # pylint: disable=redefined-outer-name
storage : Storage, # pylint: disable=redefined-outer-name
) -> None:
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
check_config_interfaces(driver, storage)
check_config_network_instances(driver, storage)
# TODO: update structure
resources_to_delete = [
network_instance(NI_NAME, 'L3VRF'),
network_instance(ni['name'], ni['type'])
for ni in NETWORK_INSTANCES
]
LOGGER.info('resources_to_delete = {:s}'.format(str(resources_to_delete)))
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
LOGGER.info('results_deleteconfig = {:s}'.format(str(results_deleteconfig)))
check_updates(results_deleteconfig, '/network_instance[{:s}]', [ni['name'] for ni in NETWORK_INSTANCES])
network_instances = sorted([NI_NAME])
results = set(results_deleteconfig)
assert len(results) == len(network_instances)
for ni_name in network_instances:
assert ('/network_instance[{:s}]'.format(ni_name), True) in results
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
for resource_key, resource_value in results_getconfig:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is None: continue
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
expected_getconfig = get_expected_network_instance_config(storage)
diff_data = deepdiff.DeepDiff(sorted(expected_getconfig), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
check_config_interfaces(driver, storage, max_retries=5)
check_config_network_instances(driver, storage, max_retries=5)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, deepdiff, logging, time
from typing import Callable, Dict, List, Tuple
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES,
RESOURCE_ROUTING_POLICIES, RESOURCE_SERVICES
)
from device.service.drivers.gnmi_openconfig.GnmiOpenConfigDriver import GnmiOpenConfigDriver
from device.tests.gnmi_openconfig.storage.Storage import Storage
from .result_config_adapters import adapt_endpoint, adapt_interface, adapt_network_instance
LOGGER = logging.getLogger(__name__)
def check_expected_config(
driver : GnmiOpenConfigDriver, resources_to_get : Dict[str], expected_config : List[Dict],
func_adapt_returned_config : Callable[[Tuple[str, Dict]], Tuple[str, Dict]] = lambda x: x,
max_retries : int = 1, retry_delay : float = 0.5
) -> List[Dict]:
num_retry = 0
return_data = None
while num_retry < max_retries:
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
return_data = copy.deepcopy(results_getconfig)
results_getconfig = [
func_adapt_returned_config(resource_key, resource_value)
for resource_key, resource_value in results_getconfig
]
diff_data = deepdiff.DeepDiff(sorted(expected_config), sorted(results_getconfig))
num_diffs = len(diff_data)
if num_diffs == 0: break
# let the device take some time to reconfigure
time.sleep(retry_delay)
num_retry -= 1
if num_diffs > 0: LOGGER.error('Differences[{:d}]:\n{:s}'.format(num_diffs, str(diff_data.pretty())))
assert num_diffs == 0
return return_data
def check_config_endpoints(
driver : GnmiOpenConfigDriver, storage : Storage,
max_retries : int = 1, retry_delay : float = 0.5
) -> List[Dict]:
return check_expected_config(
driver, [RESOURCE_ENDPOINTS], storage.endpoints.get_expected_config(),
adapt_endpoint, max_retries=max_retries, retry_delay=retry_delay
)
def check_config_interfaces(
driver : GnmiOpenConfigDriver, storage : Storage,
max_retries : int = 1, retry_delay : float = 0.5
) -> List[Dict]:
return check_expected_config(
driver, [RESOURCE_INTERFACES], storage.interfaces.get_expected_config(),
adapt_interface, max_retries=max_retries, retry_delay=retry_delay
)
def check_config_network_instances(
driver : GnmiOpenConfigDriver, storage : Storage,
max_retries : int = 1, retry_delay : float = 0.5
) -> List[Dict]:
expected_config =
return check_expected_config(
driver, [RESOURCE_NETWORK_INSTANCES], storage.network_instances.get_expected_config(),
adapt_network_instance, max_retries=max_retries, retry_delay=retry_delay
)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, List, Tuple
def check_updates(results : Iterable[Tuple[str, bool]], format_str : str, item_ids : List[Tuple]) -> None:
results = set(results)
assert len(results) == len(item_ids)
for item_id in item_ids:
assert (format_str.format(*item_id), True) in results
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
def compose_expected_config__network_instance(
network_instances : List[Dict], include_interfaces : bool = False, include_static_routes : bool = False
) -> List[Dict]:
expected_config = list()
for network_instance in network_instances:
ni_name = network_instance['name']
ni_type = network_instance['type']
expected_config.extend([
('/network_instance[{:s}]'.format(ni_name), {
'name': ni_name, 'type': ni_type
}),
('/network_instance[{:s}]/protocol[DIRECTLY_CONNECTED]'.format(ni_name), {
'id': 'DIRECTLY_CONNECTED', 'name': 'DIRECTLY_CONNECTED'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV4]'.format(ni_name), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV4'
}),
('/network_instance[{:s}]/table[DIRECTLY_CONNECTED,IPV6]'.format(ni_name), {
'protocol': 'DIRECTLY_CONNECTED', 'address_family': 'IPV6'
})
])
if include_interfaces:
expected_config.extend([
('/network_instance[{:s}]/interface[{:s}]'.format(ni_name, interface['name']), {
})
for interface in network_instance.get('interfaces', list())
])
if include_static_routes:
expected_config.extend([
('/network_instance[{:s}]/static_route[{:s}]'.format(ni_name, static_route['prefix']), {
'name': ni_name, 'prefix': static_route['prefix'], 'next_hop': static_route['gateway'],
'next_hop_index': 0, 'metric': static_route['metric']
})
for static_route in network_instance.get('static_routes', list())
])
return expected_config
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Dict, Tuple
def adapt_endpoint(resource_key : str, resource_value : Dict) -> Tuple[str, Dict]:
return resource_key, resource_value
def adapt_interface(resource_key : str, resource_value : Dict) -> Tuple[str, Dict]:
return resource_key, resource_value
def adapt_network_instance(resource_key : str, resource_value : Dict) -> Tuple[str, Dict]:
match = re.match(r'^\/network\_instance\[([^\]]+)\]\/vlan\[([^\]]+)\]$', resource_key)
if match is not None:
members = resource_value.get('members')
if len(members) > 0: resource_value['members'] = sorted(members)
return resource_key, resource_value
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment