Loading hackfest/mock_osm/__main__.py +1 −0 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ import cmd, logging import cmd, logging from .MockOSM import MockOSM from .MockOSM import MockOSM logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.setLevel(logging.DEBUG) Loading src/device/service/driver_api/DriverFactory.py +15 −9 Original line number Original line Diff line number Diff line Loading @@ -14,8 +14,7 @@ import logging import logging from enum import Enum from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type from ._Driver import _Driver from .Exceptions import ( from .Exceptions import ( AmbiguousFilterException, EmptyFilterFieldException, AmbiguousFilterException, EmptyFilterFieldException, UnsatisfiedFilterException, UnsupportedDriverClassException, UnsatisfiedFilterException, UnsupportedDriverClassException, Loading @@ -23,12 +22,20 @@ from .Exceptions import ( ) ) from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum if TYPE_CHECKING: from ._Driver import _Driver LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) SUPPORTED_FILTER_FIELDS = set(FILTER_FIELD_ALLOWED_VALUES.keys()) SUPPORTED_FILTER_FIELDS = set(FILTER_FIELD_ALLOWED_VALUES.keys()) def check_is_class_valid(driver_class : Type['_Driver']) -> None: from ._Driver import _Driver if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) def sanitize_filter_fields( def sanitize_filter_fields( filter_fields : Dict[FilterFieldEnum, Any], driver_name : Optional[str] = None filter_fields : Dict[FilterFieldEnum, Any], driver_name : Optional[str] = None ) -> Dict[FilterFieldEnum, Any]: ) -> Dict[FilterFieldEnum, Any]: Loading Loading @@ -67,14 +74,13 @@ def sanitize_filter_fields( class DriverFactory: class DriverFactory: def __init__( def __init__( self, drivers : List[Tuple[Type[_Driver], List[Dict[FilterFieldEnum, Any]]]] self, drivers : List[Tuple[Type['_Driver'], List[Dict[FilterFieldEnum, Any]]]] ) -> None: ) -> None: self.__drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = list() self.__drivers : List[Tuple[Type['_Driver'], Dict[FilterFieldEnum, Any]]] = list() for driver_class,filter_field_sets in drivers: for driver_class,filter_field_sets in drivers: #if not issubclass(driver_class, _Driver): check_is_class_valid(driver_class) # raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ driver_name = driver_class #.__name__ for filter_fields in filter_field_sets: for filter_fields in filter_field_sets: filter_fields = {k.value:v for k,v in filter_fields.items()} filter_fields = {k.value:v for k,v in filter_fields.items()} Loading Loading @@ -102,7 +108,7 @@ class DriverFactory: return True return True def get_driver_class(self, **selection_filter_fields) -> _Driver: def get_driver_class(self, **selection_filter_fields) -> '_Driver': sanitized_filter_fields = sanitize_filter_fields(selection_filter_fields) sanitized_filter_fields = sanitize_filter_fields(selection_filter_fields) compatible_drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = [ compatible_drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = [ Loading src/device/service/driver_api/Exceptions.py +12 −10 Original line number Original line Diff line number Diff line Loading @@ -13,22 +13,22 @@ # limitations under the License. # limitations under the License. class UnsatisfiedFilterException(Exception): class UnsatisfiedFilterException(Exception): def __init__(self, filter_fields): def __init__(self, filter_fields) -> None: msg = 'No Driver satisfies FilterFields({:s})' msg = 'No Driver satisfies FilterFields({:s})' super().__init__(msg.format(str(filter_fields))) super().__init__(msg.format(str(filter_fields))) class AmbiguousFilterException(Exception): class AmbiguousFilterException(Exception): def __init__(self, filter_fields, compatible_drivers): def __init__(self, filter_fields, compatible_drivers) -> None: msg = 'Multiple Drivers satisfy FilterFields({:s}): {:s}' msg = 'Multiple Drivers satisfy FilterFields({:s}): {:s}' super().__init__(msg.format(str(filter_fields), str(compatible_drivers))) super().__init__(msg.format(str(filter_fields), str(compatible_drivers))) class UnsupportedDriverClassException(Exception): class UnsupportedDriverClassException(Exception): def __init__(self, driver_class_name): def __init__(self, driver_class_name) -> None: msg = 'Class({:s}) is not a subclass of _Driver' msg = 'Class({:s}) is not a subclass of _Driver' super().__init__(msg.format(str(driver_class_name))) super().__init__(msg.format(str(driver_class_name))) class EmptyFilterFieldException(Exception): class EmptyFilterFieldException(Exception): def __init__(self, filter_fields, driver_class_name=None): def __init__(self, filter_fields, driver_class_name=None) -> None: if driver_class_name: if driver_class_name: msg = 'Empty FilterField({:s}) specified by Driver({:s}) is not supported' msg = 'Empty FilterField({:s}) specified by Driver({:s}) is not supported' msg = msg.format(str(filter_fields), str(driver_class_name)) msg = msg.format(str(filter_fields), str(driver_class_name)) Loading @@ -38,7 +38,7 @@ class EmptyFilterFieldException(Exception): super().__init__(msg) super().__init__(msg) class UnsupportedFilterFieldException(Exception): class UnsupportedFilterFieldException(Exception): def __init__(self, unsupported_filter_fields, driver_class_name=None): def __init__(self, unsupported_filter_fields, driver_class_name=None) -> None: if driver_class_name: if driver_class_name: msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported' msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported' msg = msg.format(str(unsupported_filter_fields), str(driver_class_name)) msg = msg.format(str(unsupported_filter_fields), str(driver_class_name)) Loading @@ -48,7 +48,9 @@ class UnsupportedFilterFieldException(Exception): super().__init__(msg) super().__init__(msg) class UnsupportedFilterFieldValueException(Exception): class UnsupportedFilterFieldValueException(Exception): def __init__(self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None): def __init__( self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None ) -> None: if driver_class_name: if driver_class_name: msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}' msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}' msg = msg.format( msg = msg.format( Loading @@ -60,24 +62,24 @@ class UnsupportedFilterFieldValueException(Exception): super().__init__(msg) super().__init__(msg) class DriverInstanceCacheTerminatedException(Exception): class DriverInstanceCacheTerminatedException(Exception): def __init__(self): def __init__(self) -> None: msg = 'DriverInstanceCache is terminated. No new instances can be processed.' msg = 'DriverInstanceCache is terminated. No new instances can be processed.' super().__init__(msg) super().__init__(msg) class UnsupportedResourceKeyException(Exception): class UnsupportedResourceKeyException(Exception): def __init__(self, resource_key): def __init__(self, resource_key) -> None: msg = 'ResourceKey({:s}) not supported' msg = 'ResourceKey({:s}) not supported' msg = msg.format(str(resource_key)) msg = msg.format(str(resource_key)) super().__init__(msg) super().__init__(msg) class ConfigFieldNotFoundException(Exception): class ConfigFieldNotFoundException(Exception): def __init__(self, config_field_name): def __init__(self, config_field_name) -> None: msg = 'ConfigField({:s}) not specified in resource' msg = 'ConfigField({:s}) not specified in resource' msg = msg.format(str(config_field_name)) msg = msg.format(str(config_field_name)) super().__init__(msg) super().__init__(msg) class ConfigFieldsNotSupportedException(Exception): class ConfigFieldsNotSupportedException(Exception): def __init__(self, config_fields): def __init__(self, config_fields) -> None: msg = 'ConfigFields({:s}) not supported in resource' msg = 'ConfigFields({:s}) not supported in resource' msg = msg.format(str(config_fields)) msg = msg.format(str(config_fields)) super().__init__(msg) super().__init__(msg) src/device/service/drivers/gnmi_openconfig/handlers/InterfaceSwitchedVlan.py 0 → 100644 +104 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import Any, Dict, List, Tuple from ._Handler import _Handler from .Tools import get_str from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) RE_IF_SWITCHED_VLAN = re.compile(r'^/interface\[(?:name=)?([^\]]+)\]/ethernet/switched-vlan$') class InterfaceSwitchedVlanHandler(_Handler): def get_resource_key(self) -> str: return '/interface/ethernet/switched-vlan' def get_path(self) -> str: return '/openconfig-interfaces:interfaces/interface/ethernet/switched-vlan' def _get_interface_name(self, resource_key : str, resource_value : Dict) -> str: if 'name' in resource_value: return get_str(resource_value, 'name') if 'interface' in resource_value: return get_str(resource_value, 'interface') match = RE_IF_SWITCHED_VLAN.match(resource_key) if match is None: MSG = 'Interface name not found in resource_key={:s} resource_value={:s}' raise Exception(MSG.format(str(resource_key), str(resource_value))) return match.groups()[0] def _normalize_config(self, resource_value : Dict) -> Dict[str, Any]: config = resource_value.get('config') if isinstance(config, dict): return config interface_mode = resource_value.get('interface-mode', resource_value.get('interface_mode')) if interface_mode is None: raise Exception('interface-mode is required for switched-vlan config') interface_mode = str(interface_mode).upper() config = {'interface-mode': interface_mode} if interface_mode == 'ACCESS': access_vlan = resource_value.get('access-vlan', resource_value.get('access_vlan')) if access_vlan is None: raise Exception('access-vlan is required for ACCESS mode') config['access-vlan'] = int(access_vlan) elif interface_mode == 'TRUNK': native_vlan = resource_value.get('native-vlan', resource_value.get('native_vlan', 1)) config['native-vlan'] = int(native_vlan) trunk_vlans = resource_value.get('trunk-vlans', resource_value.get('trunk_vlans')) if trunk_vlans is None: trunk_vlan = resource_value.get('trunk-vlan', resource_value.get('trunk_vlan')) trunk_vlans = [trunk_vlan] if trunk_vlan is not None else [] if not isinstance(trunk_vlans, list): trunk_vlans = [trunk_vlans] config['trunk-vlans'] = [int(vlan) for vlan in trunk_vlans if vlan is not None] else: raise Exception('Unsupported interface-mode: {:s}'.format(str(interface_mode))) return config def compose( self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False ) -> Tuple[str, str]: if_name = self._get_interface_name(resource_key, resource_value) str_path = '/interfaces/interface[name={:s}]/ethernet/switched-vlan'.format(if_name) if delete: return str_path, json.dumps({}) config = self._normalize_config(resource_value) str_data = json.dumps({'config': config}) return str_path, str_data def parse( self, json_data : Dict, yang_handler : YangHandler ) -> List[Tuple[str, Dict[str, Any]]]: json_data_valid = yang_handler.parse_to_dict( '/openconfig-interfaces:interfaces', json_data, fmt='json', strict=False ) entries = [] for interface in json_data_valid.get('interfaces', {}).get('interface', []): interface_name = interface['name'] ethernet = interface.get('ethernet', {}) switched_vlan = ethernet.get('switched-vlan') if switched_vlan is None: continue entry_key = '/interface[{:s}]/ethernet/switched-vlan'.format(interface_name) entry_value = {} if 'config' in switched_vlan: entry_value['config'] = switched_vlan['config'] if 'state' in switched_vlan: entry_value['state'] = switched_vlan['state'] entries.append((entry_key, entry_value)) return entries src/device/service/drivers/gnmi_openconfig/handlers/Mpls.py 0 → 100644 +121 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import Any, Dict, List, Tuple from ._Handler import _Handler from .Tools import get_int, get_str from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) RE_MPLS_INTERFACE = re.compile(r'^/mpls/interface\[([^\]]+)\]$') DEFAULT_NETWORK_INSTANCE = 'default' class MplsHandler(_Handler): def get_resource_key(self) -> str: return '/mpls' def get_path(self) -> str: return '/openconfig-network-instance:network-instances/network-instance/mpls' def compose( self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False ) -> Tuple[str, str]: """ Compose MPLS (global or per-interface) configuration. - Global: set LDP router-id (lsr-id) and optional hello timers. - Interface: set LDP interface-id and optional hello timers. """ ni_name = get_str(resource_value, 'network_instance', DEFAULT_NETWORK_INSTANCE) ni_type = get_str(resource_value, 'network_instance_type') if ni_type is None and ni_name == DEFAULT_NETWORK_INSTANCE: ni_type = 'openconfig-network-instance-types:DEFAULT_INSTANCE' yang_nis : Any = yang_handler.get_data_path('/openconfig-network-instance:network-instances') yang_ni : Any = yang_nis.create_path('network-instance[name="{:s}"]'.format(ni_name)) yang_ni.create_path('config/name', ni_name) if ni_type is not None: yang_ni.create_path('config/type', ni_type) match_if = RE_MPLS_INTERFACE.match(resource_key) if delete: if match_if: if_name = match_if.group(1) str_path = ( '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp' '/interface-attributes/interfaces/interface[interface-id={:s}]' ).format(ni_name, if_name) else: str_path = '/network-instances/network-instance[name={:s}]/mpls'.format(ni_name) return str_path, json.dumps({}) if match_if: if_name = match_if.group(1) hello_interval = get_int(resource_value, 'hello_interval') hello_holdtime = get_int(resource_value, 'hello_holdtime') path_if_base = ( 'mpls/signaling-protocols/ldp/interface-attributes/interfaces' '/interface[interface-id="{:s}"]/config' ).format(if_name) yang_ni.create_path('{:s}/interface-id'.format(path_if_base), if_name) if hello_interval is not None: yang_ni.create_path('{:s}/hello-interval'.format(path_if_base), hello_interval) if hello_holdtime is not None: yang_ni.create_path('{:s}/hello-holdtime'.format(path_if_base), hello_holdtime) yang_if : Any = yang_ni.find_path( 'mpls/signaling-protocols/ldp/interface-attributes/interfaces' '/interface[interface-id="{:s}"]'.format(if_name) ) str_path = ( '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp' '/interface-attributes/interfaces/interface[interface-id={:s}]' ).format(ni_name, if_name) json_data = json.loads(yang_if.print_mem('json')) json_data = json_data['openconfig-network-instance:interface'][0] str_data = json.dumps(json_data) return str_path, str_data # Global LDP configuration ldp_cfg = resource_value.get('ldp', resource_value) lsr_id = get_str(ldp_cfg, 'lsr_id') hello_interval = get_int(ldp_cfg, 'hello_interval') hello_holdtime = get_int(ldp_cfg, 'hello_holdtime') if lsr_id is not None: yang_ni.create_path('mpls/signaling-protocols/ldp/global/config/lsr-id', lsr_id) if hello_interval is not None: yang_ni.create_path( 'mpls/signaling-protocols/ldp/interface-attributes/config/hello-interval', hello_interval ) if hello_holdtime is not None: yang_ni.create_path( 'mpls/signaling-protocols/ldp/interface-attributes/config/hello-holdtime', hello_holdtime ) yang_ldp : Any = yang_ni.find_path('mpls/signaling-protocols/ldp') str_path = '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp'.format(ni_name) json_data = json.loads(yang_ldp.print_mem('json')) json_data = json_data['openconfig-network-instance:ldp'] str_data = json.dumps(json_data) return str_path, str_data def parse( self, json_data : Dict, yang_handler : YangHandler ) -> List[Tuple[str, Dict[str, Any]]]: LOGGER.debug('[parse] json_data = %s', json.dumps(json_data)) # Not required for current tests (L2VPN validation focuses on SetConfig). return [] Loading
hackfest/mock_osm/__main__.py +1 −0 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ import cmd, logging import cmd, logging from .MockOSM import MockOSM from .MockOSM import MockOSM logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.setLevel(logging.DEBUG) Loading
src/device/service/driver_api/DriverFactory.py +15 −9 Original line number Original line Diff line number Diff line Loading @@ -14,8 +14,7 @@ import logging import logging from enum import Enum from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type from ._Driver import _Driver from .Exceptions import ( from .Exceptions import ( AmbiguousFilterException, EmptyFilterFieldException, AmbiguousFilterException, EmptyFilterFieldException, UnsatisfiedFilterException, UnsupportedDriverClassException, UnsatisfiedFilterException, UnsupportedDriverClassException, Loading @@ -23,12 +22,20 @@ from .Exceptions import ( ) ) from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum if TYPE_CHECKING: from ._Driver import _Driver LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) SUPPORTED_FILTER_FIELDS = set(FILTER_FIELD_ALLOWED_VALUES.keys()) SUPPORTED_FILTER_FIELDS = set(FILTER_FIELD_ALLOWED_VALUES.keys()) def check_is_class_valid(driver_class : Type['_Driver']) -> None: from ._Driver import _Driver if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) def sanitize_filter_fields( def sanitize_filter_fields( filter_fields : Dict[FilterFieldEnum, Any], driver_name : Optional[str] = None filter_fields : Dict[FilterFieldEnum, Any], driver_name : Optional[str] = None ) -> Dict[FilterFieldEnum, Any]: ) -> Dict[FilterFieldEnum, Any]: Loading Loading @@ -67,14 +74,13 @@ def sanitize_filter_fields( class DriverFactory: class DriverFactory: def __init__( def __init__( self, drivers : List[Tuple[Type[_Driver], List[Dict[FilterFieldEnum, Any]]]] self, drivers : List[Tuple[Type['_Driver'], List[Dict[FilterFieldEnum, Any]]]] ) -> None: ) -> None: self.__drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = list() self.__drivers : List[Tuple[Type['_Driver'], Dict[FilterFieldEnum, Any]]] = list() for driver_class,filter_field_sets in drivers: for driver_class,filter_field_sets in drivers: #if not issubclass(driver_class, _Driver): check_is_class_valid(driver_class) # raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ driver_name = driver_class #.__name__ for filter_fields in filter_field_sets: for filter_fields in filter_field_sets: filter_fields = {k.value:v for k,v in filter_fields.items()} filter_fields = {k.value:v for k,v in filter_fields.items()} Loading Loading @@ -102,7 +108,7 @@ class DriverFactory: return True return True def get_driver_class(self, **selection_filter_fields) -> _Driver: def get_driver_class(self, **selection_filter_fields) -> '_Driver': sanitized_filter_fields = sanitize_filter_fields(selection_filter_fields) sanitized_filter_fields = sanitize_filter_fields(selection_filter_fields) compatible_drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = [ compatible_drivers : List[Tuple[Type[_Driver], Dict[FilterFieldEnum, Any]]] = [ Loading
src/device/service/driver_api/Exceptions.py +12 −10 Original line number Original line Diff line number Diff line Loading @@ -13,22 +13,22 @@ # limitations under the License. # limitations under the License. class UnsatisfiedFilterException(Exception): class UnsatisfiedFilterException(Exception): def __init__(self, filter_fields): def __init__(self, filter_fields) -> None: msg = 'No Driver satisfies FilterFields({:s})' msg = 'No Driver satisfies FilterFields({:s})' super().__init__(msg.format(str(filter_fields))) super().__init__(msg.format(str(filter_fields))) class AmbiguousFilterException(Exception): class AmbiguousFilterException(Exception): def __init__(self, filter_fields, compatible_drivers): def __init__(self, filter_fields, compatible_drivers) -> None: msg = 'Multiple Drivers satisfy FilterFields({:s}): {:s}' msg = 'Multiple Drivers satisfy FilterFields({:s}): {:s}' super().__init__(msg.format(str(filter_fields), str(compatible_drivers))) super().__init__(msg.format(str(filter_fields), str(compatible_drivers))) class UnsupportedDriverClassException(Exception): class UnsupportedDriverClassException(Exception): def __init__(self, driver_class_name): def __init__(self, driver_class_name) -> None: msg = 'Class({:s}) is not a subclass of _Driver' msg = 'Class({:s}) is not a subclass of _Driver' super().__init__(msg.format(str(driver_class_name))) super().__init__(msg.format(str(driver_class_name))) class EmptyFilterFieldException(Exception): class EmptyFilterFieldException(Exception): def __init__(self, filter_fields, driver_class_name=None): def __init__(self, filter_fields, driver_class_name=None) -> None: if driver_class_name: if driver_class_name: msg = 'Empty FilterField({:s}) specified by Driver({:s}) is not supported' msg = 'Empty FilterField({:s}) specified by Driver({:s}) is not supported' msg = msg.format(str(filter_fields), str(driver_class_name)) msg = msg.format(str(filter_fields), str(driver_class_name)) Loading @@ -38,7 +38,7 @@ class EmptyFilterFieldException(Exception): super().__init__(msg) super().__init__(msg) class UnsupportedFilterFieldException(Exception): class UnsupportedFilterFieldException(Exception): def __init__(self, unsupported_filter_fields, driver_class_name=None): def __init__(self, unsupported_filter_fields, driver_class_name=None) -> None: if driver_class_name: if driver_class_name: msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported' msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported' msg = msg.format(str(unsupported_filter_fields), str(driver_class_name)) msg = msg.format(str(unsupported_filter_fields), str(driver_class_name)) Loading @@ -48,7 +48,9 @@ class UnsupportedFilterFieldException(Exception): super().__init__(msg) super().__init__(msg) class UnsupportedFilterFieldValueException(Exception): class UnsupportedFilterFieldValueException(Exception): def __init__(self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None): def __init__( self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None ) -> None: if driver_class_name: if driver_class_name: msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}' msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}' msg = msg.format( msg = msg.format( Loading @@ -60,24 +62,24 @@ class UnsupportedFilterFieldValueException(Exception): super().__init__(msg) super().__init__(msg) class DriverInstanceCacheTerminatedException(Exception): class DriverInstanceCacheTerminatedException(Exception): def __init__(self): def __init__(self) -> None: msg = 'DriverInstanceCache is terminated. No new instances can be processed.' msg = 'DriverInstanceCache is terminated. No new instances can be processed.' super().__init__(msg) super().__init__(msg) class UnsupportedResourceKeyException(Exception): class UnsupportedResourceKeyException(Exception): def __init__(self, resource_key): def __init__(self, resource_key) -> None: msg = 'ResourceKey({:s}) not supported' msg = 'ResourceKey({:s}) not supported' msg = msg.format(str(resource_key)) msg = msg.format(str(resource_key)) super().__init__(msg) super().__init__(msg) class ConfigFieldNotFoundException(Exception): class ConfigFieldNotFoundException(Exception): def __init__(self, config_field_name): def __init__(self, config_field_name) -> None: msg = 'ConfigField({:s}) not specified in resource' msg = 'ConfigField({:s}) not specified in resource' msg = msg.format(str(config_field_name)) msg = msg.format(str(config_field_name)) super().__init__(msg) super().__init__(msg) class ConfigFieldsNotSupportedException(Exception): class ConfigFieldsNotSupportedException(Exception): def __init__(self, config_fields): def __init__(self, config_fields) -> None: msg = 'ConfigFields({:s}) not supported in resource' msg = 'ConfigFields({:s}) not supported in resource' msg = msg.format(str(config_fields)) msg = msg.format(str(config_fields)) super().__init__(msg) super().__init__(msg)
src/device/service/drivers/gnmi_openconfig/handlers/InterfaceSwitchedVlan.py 0 → 100644 +104 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import Any, Dict, List, Tuple from ._Handler import _Handler from .Tools import get_str from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) RE_IF_SWITCHED_VLAN = re.compile(r'^/interface\[(?:name=)?([^\]]+)\]/ethernet/switched-vlan$') class InterfaceSwitchedVlanHandler(_Handler): def get_resource_key(self) -> str: return '/interface/ethernet/switched-vlan' def get_path(self) -> str: return '/openconfig-interfaces:interfaces/interface/ethernet/switched-vlan' def _get_interface_name(self, resource_key : str, resource_value : Dict) -> str: if 'name' in resource_value: return get_str(resource_value, 'name') if 'interface' in resource_value: return get_str(resource_value, 'interface') match = RE_IF_SWITCHED_VLAN.match(resource_key) if match is None: MSG = 'Interface name not found in resource_key={:s} resource_value={:s}' raise Exception(MSG.format(str(resource_key), str(resource_value))) return match.groups()[0] def _normalize_config(self, resource_value : Dict) -> Dict[str, Any]: config = resource_value.get('config') if isinstance(config, dict): return config interface_mode = resource_value.get('interface-mode', resource_value.get('interface_mode')) if interface_mode is None: raise Exception('interface-mode is required for switched-vlan config') interface_mode = str(interface_mode).upper() config = {'interface-mode': interface_mode} if interface_mode == 'ACCESS': access_vlan = resource_value.get('access-vlan', resource_value.get('access_vlan')) if access_vlan is None: raise Exception('access-vlan is required for ACCESS mode') config['access-vlan'] = int(access_vlan) elif interface_mode == 'TRUNK': native_vlan = resource_value.get('native-vlan', resource_value.get('native_vlan', 1)) config['native-vlan'] = int(native_vlan) trunk_vlans = resource_value.get('trunk-vlans', resource_value.get('trunk_vlans')) if trunk_vlans is None: trunk_vlan = resource_value.get('trunk-vlan', resource_value.get('trunk_vlan')) trunk_vlans = [trunk_vlan] if trunk_vlan is not None else [] if not isinstance(trunk_vlans, list): trunk_vlans = [trunk_vlans] config['trunk-vlans'] = [int(vlan) for vlan in trunk_vlans if vlan is not None] else: raise Exception('Unsupported interface-mode: {:s}'.format(str(interface_mode))) return config def compose( self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False ) -> Tuple[str, str]: if_name = self._get_interface_name(resource_key, resource_value) str_path = '/interfaces/interface[name={:s}]/ethernet/switched-vlan'.format(if_name) if delete: return str_path, json.dumps({}) config = self._normalize_config(resource_value) str_data = json.dumps({'config': config}) return str_path, str_data def parse( self, json_data : Dict, yang_handler : YangHandler ) -> List[Tuple[str, Dict[str, Any]]]: json_data_valid = yang_handler.parse_to_dict( '/openconfig-interfaces:interfaces', json_data, fmt='json', strict=False ) entries = [] for interface in json_data_valid.get('interfaces', {}).get('interface', []): interface_name = interface['name'] ethernet = interface.get('ethernet', {}) switched_vlan = ethernet.get('switched-vlan') if switched_vlan is None: continue entry_key = '/interface[{:s}]/ethernet/switched-vlan'.format(interface_name) entry_value = {} if 'config' in switched_vlan: entry_value['config'] = switched_vlan['config'] if 'state' in switched_vlan: entry_value['state'] = switched_vlan['state'] entries.append((entry_key, entry_value)) return entries
src/device/service/drivers/gnmi_openconfig/handlers/Mpls.py 0 → 100644 +121 −0 Original line number Original line Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging, re from typing import Any, Dict, List, Tuple from ._Handler import _Handler from .Tools import get_int, get_str from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) RE_MPLS_INTERFACE = re.compile(r'^/mpls/interface\[([^\]]+)\]$') DEFAULT_NETWORK_INSTANCE = 'default' class MplsHandler(_Handler): def get_resource_key(self) -> str: return '/mpls' def get_path(self) -> str: return '/openconfig-network-instance:network-instances/network-instance/mpls' def compose( self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False ) -> Tuple[str, str]: """ Compose MPLS (global or per-interface) configuration. - Global: set LDP router-id (lsr-id) and optional hello timers. - Interface: set LDP interface-id and optional hello timers. """ ni_name = get_str(resource_value, 'network_instance', DEFAULT_NETWORK_INSTANCE) ni_type = get_str(resource_value, 'network_instance_type') if ni_type is None and ni_name == DEFAULT_NETWORK_INSTANCE: ni_type = 'openconfig-network-instance-types:DEFAULT_INSTANCE' yang_nis : Any = yang_handler.get_data_path('/openconfig-network-instance:network-instances') yang_ni : Any = yang_nis.create_path('network-instance[name="{:s}"]'.format(ni_name)) yang_ni.create_path('config/name', ni_name) if ni_type is not None: yang_ni.create_path('config/type', ni_type) match_if = RE_MPLS_INTERFACE.match(resource_key) if delete: if match_if: if_name = match_if.group(1) str_path = ( '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp' '/interface-attributes/interfaces/interface[interface-id={:s}]' ).format(ni_name, if_name) else: str_path = '/network-instances/network-instance[name={:s}]/mpls'.format(ni_name) return str_path, json.dumps({}) if match_if: if_name = match_if.group(1) hello_interval = get_int(resource_value, 'hello_interval') hello_holdtime = get_int(resource_value, 'hello_holdtime') path_if_base = ( 'mpls/signaling-protocols/ldp/interface-attributes/interfaces' '/interface[interface-id="{:s}"]/config' ).format(if_name) yang_ni.create_path('{:s}/interface-id'.format(path_if_base), if_name) if hello_interval is not None: yang_ni.create_path('{:s}/hello-interval'.format(path_if_base), hello_interval) if hello_holdtime is not None: yang_ni.create_path('{:s}/hello-holdtime'.format(path_if_base), hello_holdtime) yang_if : Any = yang_ni.find_path( 'mpls/signaling-protocols/ldp/interface-attributes/interfaces' '/interface[interface-id="{:s}"]'.format(if_name) ) str_path = ( '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp' '/interface-attributes/interfaces/interface[interface-id={:s}]' ).format(ni_name, if_name) json_data = json.loads(yang_if.print_mem('json')) json_data = json_data['openconfig-network-instance:interface'][0] str_data = json.dumps(json_data) return str_path, str_data # Global LDP configuration ldp_cfg = resource_value.get('ldp', resource_value) lsr_id = get_str(ldp_cfg, 'lsr_id') hello_interval = get_int(ldp_cfg, 'hello_interval') hello_holdtime = get_int(ldp_cfg, 'hello_holdtime') if lsr_id is not None: yang_ni.create_path('mpls/signaling-protocols/ldp/global/config/lsr-id', lsr_id) if hello_interval is not None: yang_ni.create_path( 'mpls/signaling-protocols/ldp/interface-attributes/config/hello-interval', hello_interval ) if hello_holdtime is not None: yang_ni.create_path( 'mpls/signaling-protocols/ldp/interface-attributes/config/hello-holdtime', hello_holdtime ) yang_ldp : Any = yang_ni.find_path('mpls/signaling-protocols/ldp') str_path = '/network-instances/network-instance[name={:s}]/mpls/signaling-protocols/ldp'.format(ni_name) json_data = json.loads(yang_ldp.print_mem('json')) json_data = json_data['openconfig-network-instance:ldp'] str_data = json.dumps(json_data) return str_path, str_data def parse( self, json_data : Dict, yang_handler : YangHandler ) -> List[Tuple[str, Dict[str, Any]]]: LOGGER.debug('[parse] json_data = %s', json.dumps(json_data)) # Not required for current tests (L2VPN validation focuses on SetConfig). return []