Loading scripts/run_tests_locally-device-restconf-openconfig.sh 0 → 100755 +25 −0 Original line number Diff line number Diff line #!/bin/bash # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc # Run unitary tests and analyze coverage of code at same time # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 coverage run --rcfile=$RCFILE --append -m pytest --log-level=DEBUG --verbose -o log_cli=true \ device/tests/restconf_openconfig/test_unitary_restconf_openconfig.py src/device/service/drivers/restconf_openconfig/RestConfOpenConfigDriver.py 0 → 100644 +173 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy, json, logging, re, requests, threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.tools.rest_conf.client.RestConfClient import RestConfClient from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import ( _Driver, RESOURCE_ACL, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES ) from .handlers.ComponentsHandler import ComponentsHandler from .handlers.AclRuleSetHandler import AclRuleSetHandler ALL_RESOURCE_KEYS = [ RESOURCE_ACL, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, ] RE_ACL_RULESET = re.compile( r'^\/device\[([^\]]+)\]\/endpoint\[([^\]]+)\]\/acl\_ruleset\[([^\]]+)\]$' ) def parse_resource_key(resource_key : str) -> Optional[Tuple[str, str, str]]: re_match_acl_ruleset = RE_ACL_RULESET.match(resource_key) if re_match_acl_ruleset is None: return None device_key, endpoint_key, acl_ruleset_name = re_match_acl_ruleset.groups() return device_key, endpoint_key, acl_ruleset_name DRIVER_NAME = 'restconf_openconfig' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) class RestConfOpenConfigDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) logger_prefix = '{:s}:[{:s}:{:s}]'.format( str(__name__), str(self.address), str(self.port) ) self.__logger = logging.getLogger(logger_prefix) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() restconf_settings = copy.deepcopy(settings) restconf_settings['logger'] = logging.getLogger(logger_prefix + '.RestConfClient_v1') self._rest_conf_client = RestConfClient(address, port=port, **restconf_settings) self._handler_components = ComponentsHandler(self._rest_conf_client) self._handler_acl_ruleset = AclRuleSetHandler(self._rest_conf_client) def Connect(self) -> bool: with self.__lock: if self.__started.is_set(): return True try: self._rest_conf_client._discover_base_url() except requests.exceptions.Timeout: self.__logger.exception('Timeout exception checking connectivity') return False except Exception: # pylint: disable=broad-except self.__logger.exception('Unhandled exception checking connectivity') return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() if not self.__started.is_set(): return True return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) results = list() with self.__lock: if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) if resource_key == RESOURCE_ENDPOINTS: results.extend(self._handler_components.get()) elif resource_key == RESOURCE_ACL: results.extend(self._handler_acl_ruleset.get()) else: parts = parse_resource_key(resource_key) if parts is None: continue device_key, endpoint_key, acl_ruleset_name = parts results.extend(self._handler_acl_ruleset.get(acl_ruleset_name=acl_ruleset_name)) except Exception as e: MSG = 'Error processing resource_key({:s}, {:s})' self.__logger.exception(MSG.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) # if processing fails, store the exception return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] with self.__lock: for resource_key, resource_value in resources: self.__logger.info('resource: key({:s}) => value({:s})'.format(str(resource_key), str(resource_value))) try: if isinstance(resource_value, str): resource_value = json.loads(resource_value) if parse_resource_key(resource_key) is None: continue results.append(self._handler_acl_ruleset.update(resource_value)) except Exception as e: results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] with self.__lock: for resource_key, resource_value in resources: self.__logger.info('resource: key({:s}) => value({:s})'.format(str(resource_key), str(resource_value))) try: #if isinstance(resource_value, str): resource_value = json.loads(resource_value) resource_key_parts = parse_resource_key(resource_key) if resource_key_parts is None: continue _, _, acl_ruleset_name = resource_key_parts results.append(self._handler_acl_ruleset.delete(acl_ruleset_name)) except Exception as e: results.append(e) return results @metered_subclass_method(METRICS_POOL) def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate : Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return [] src/device/service/drivers/restconf_openconfig/__init__.py 0 → 100644 +14 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. src/device/service/drivers/restconf_openconfig/handlers/AclRuleSetHandler.py 0 → 100644 +248 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Any, Dict, List, Optional, Tuple, Union from common.proto.context_pb2 import AclDirectionEnum from common.tools.rest_conf.client.RestConfClient import RestConfClient LOGGER = logging.getLogger(__name__) _TFS_2_OC_RULE_TYPE = { 'ACLRULETYPE_IPV4': 'ACL_IPV4', 'ACLRULETYPE_IPV6': 'ACL_IPV6', } _OC_2_TFS_RULE_TYPE = {v: k for k, v in _TFS_2_OC_RULE_TYPE.items() } _TFS_2_OC_PROTOCOL = { 1 : 'IP_ICMP', 6 : 'IP_TCP', 17 : 'IP_UDP', } _OC_2_TFS_PROTOCOL = {v: k for k, v in _TFS_2_OC_PROTOCOL.items() } _TFS_2_OC_FWD_ACTION = { 'ACLFORWARDINGACTION_DROP' : 'DROP', 'ACLFORWARDINGACTION_ACCEPT': 'ACCEPT', 'ACLFORWARDINGACTION_REJECT': 'REJECT', } _OC_2_TFS_FWD_ACTION = {v: k for k, v in _TFS_2_OC_FWD_ACTION.items()} DIRECTION_INGRESS = { AclDirectionEnum.ACLDIRECTION_BOTH, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_BOTH), AclDirectionEnum.ACLDIRECTION_INGRESS, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_INGRESS), } DIRECTION_EGRESS = { AclDirectionEnum.ACLDIRECTION_BOTH, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_BOTH), AclDirectionEnum.ACLDIRECTION_EGRESS, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_EGRESS), } class AclRuleSetHandler: def __init__(self, rest_conf_client : RestConfClient) -> None: self._rest_conf_client = rest_conf_client self._subpath_root = '/openconfig-acl:acl' self._subpath_item = self._subpath_root + '/acl-sets/acl-set={acl_ruleset_name:s}' def get(self, acl_ruleset_name : Optional[str] = None) -> Union[Dict, List]: if acl_ruleset_name is None: subpath_url = self._subpath_root else: subpath_url = self._subpath_item.format(acl_ruleset_name=acl_ruleset_name) reply = self._rest_conf_client.get(subpath_url) if 'openconfig-acl:acl' not in reply: raise Exception('Malformed reply. "openconfig-acl:acl" missing') acls = reply['openconfig-acl:acl'] if 'acl-sets' not in acls: raise Exception('Malformed reply. "openconfig-acl:acl/acl-sets" missing') aclsets = acls['acl-sets'] if 'acl-set' not in aclsets: raise Exception('Malformed reply. "openconfig-acl:acl/acl-sets/acl-set" missing') aclset_lst = aclsets['acl-set'] if len(aclset_lst) == 0: MSG = '[get] No ACL-Sets are reported' LOGGER.debug(MSG) return list() results : List[Tuple[str, Dict[str, Any]]] = list() for acl_set in aclset_lst: acl_set_name = acl_set['name'] oc_acl_set_type = acl_set['config']['type'] tfs_acl_set_type = _OC_2_TFS_RULE_TYPE[oc_acl_set_type] rule_set: Dict[str, Any] = { 'name' : acl_set_name, 'type' : tfs_acl_set_type, 'entries' : [], } acl_set_config : Dict = acl_set.get('config', {}) acl_set_description = acl_set_config.get('description') if acl_set_description is not None: rule_set['description'] = acl_set_description for ace in acl_set.get('acl-entries', {}).get('acl-entry', []): seq = ace['sequence-id'] ipv4_cfg = ace.get('ipv4', {}).get('config', {}) match = dict() if 'source-address' in ipv4_cfg: match['src_address'] = ipv4_cfg['source-address'] if 'destination-address' in ipv4_cfg: match['dst_address'] = ipv4_cfg['destination-address'] if 'protocol' in ipv4_cfg: match['protocol'] = _OC_2_TFS_PROTOCOL[ipv4_cfg['protocol']] transp_cfg = ace.get('transport', {}).get('config', {}) if 'source-port' in transp_cfg: match['src_port'] = transp_cfg['source-port'] if 'destination-port' in transp_cfg: match['dst_port'] = transp_cfg['destination-port'] act = ace.get('actions', {}).get('config', {}).get('forwarding-action', 'DROP') fwd_tfs = _OC_2_TFS_FWD_ACTION[act] rule_set['entries'].append({ 'sequence_id': seq, 'match': match, 'action': {'forward_action': fwd_tfs}, }) # find where that ACL is bound (first matching interface) if_name = '' for intf in acls.get('interfaces', {}).get('interface', []): for ing in intf.get('ingress-acl-sets', {}).get('ingress-acl-set', []): if ing['set-name'] == acl_set_name: if_name = intf['id'] break path = '/device[]/endpoint[{:s}]/acl_ruleset[{:s}]'.format( if_name, acl_set_name ) tfs_acl_data = { 'endpoint_id': {'endpoint_uuid': {'uuid': if_name}}, 'direction': 'ACLDIRECTION_INGRESS', 'rule_set': rule_set, } results.append((path, tfs_acl_data)) return results def update(self, acl_data : Dict) -> bool: if_name = acl_data['endpoint_id']['endpoint_uuid']['uuid'] direction = acl_data['direction'] rule_set = acl_data['rule_set'] if direction in DIRECTION_INGRESS: acl_set_name = 'ip-filter-input' elif direction in DIRECTION_EGRESS: acl_set_name = 'ip-filter-output' else: MSG = 'Unsupported direction: {:s}' raise Exception(MSG.format(str(direction))) acl_entry_desc = rule_set['name'] acl_set_type = _TFS_2_OC_RULE_TYPE[rule_set['type']] oc_acl_entries = list() sequence_ids : List[int] = list() for entry in rule_set.get('entries', []): sequence_id = int(entry['sequence_id']) oc_action = _TFS_2_OC_FWD_ACTION[entry['action']['forward_action']] oc_acl_entry = { 'sequence-id': sequence_id, 'config': {'sequence-id': sequence_id, 'description' : acl_entry_desc}, 'actions': {'config': {'forwarding-action': oc_action}} } entry_match = entry.get('match', dict()) ipv4_config = dict() if 'protocol' in entry_match: ipv4_config['protocol'] = _TFS_2_OC_PROTOCOL[entry_match['protocol']] if 'src_address' in entry_match: ipv4_config['source-address'] = entry_match['src_address'] if 'dst_address' in entry_match: ipv4_config['destination-address'] = entry_match['dst_address'] if len(ipv4_config) > 0: oc_acl_entry.setdefault('ipv4', dict())['config'] = ipv4_config transport_config = dict() if 'src_port' in entry_match: transport_config['source-port'] = entry_match['src_port'] if 'dst_port' in entry_match: transport_config['destination-port'] = entry_match['dst_port'] if len(transport_config) > 0: oc_acl_entry.setdefault('transport', dict())['config'] = transport_config oc_acl_entries.append(oc_acl_entry) sequence_ids.append(sequence_id) oc_interface = { 'id': if_name, 'config': {'id': if_name}, 'interface-ref': {'config': {'interface': if_name, 'subinterface': 1}}, } if direction in DIRECTION_INGRESS: oc_interface['ingress-acl-sets'] = {'ingress-acl-set': [{ 'set-name': acl_set_name, 'type': acl_set_type, 'config': {'set-name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': [ {'sequence-id': sequence_id} for sequence_id in sequence_ids ]} }]} if direction in DIRECTION_EGRESS: oc_interface['egress-acl-sets'] = {'egress-acl-set': [{ 'set-name': acl_set_name, 'type': acl_set_type, 'config': {'set-name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': [ {'sequence-id': sequence_id} for sequence_id in sequence_ids ]} }]} oc_acl_data = {'openconfig-acl:acl': { 'acl-sets': {'acl-set': [{ 'name': acl_set_name, 'type': acl_set_type, 'config': {'name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': oc_acl_entries}, }]}, 'interfaces': {'interface': [oc_interface]}, }} return self._rest_conf_client.post(self._subpath_root, body=oc_acl_data) is not None def delete(self, acl_ruleset_name : str) -> bool: if acl_ruleset_name is None: raise Exception('acl_ruleset_name is None') subpath_url = self._subpath_item.format(acl_ruleset_name=acl_ruleset_name) return self._rest_conf_client.delete(subpath_url) src/device/service/drivers/restconf_openconfig/handlers/ComponentsHandler.py 0 → 100644 +71 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Dict, List, Tuple from common.tools.rest_conf.client.RestConfClient import RestConfClient LOGGER = logging.getLogger(__name__) class ComponentsHandler: def __init__(self, rest_conf_client : RestConfClient) -> None: self._rest_conf_client = rest_conf_client self._subpath_root = '/openconfig-platform:components' def get(self) -> List[Tuple[str, Dict]]: reply = self._rest_conf_client.get(self._subpath_root) if 'openconfig-platform:components' not in reply: raise Exception('Malformed reply. "openconfig-platform:components" missing') components = reply['openconfig-platform:components'] if 'component' not in components: raise Exception('Malformed reply. "openconfig-platform:components/component" missing') component_lst = components['component'] if len(component_lst) == 0: MSG = '[get] No components are reported' LOGGER.debug(MSG) return list() entries : List[Tuple[str, Dict]] = list() for component in component_lst: if 'state' not in component: MSG = 'Malformed component. "state" missing: {:s}' raise Exception(MSG.format(str(component))) comp_state = component['state'] if 'type' not in comp_state: MSG = 'Malformed component. "state/type" missing: {:s}' raise Exception(MSG.format(str(component))) comp_type : str = comp_state['type'] comp_type = comp_type.split(':')[-1] if comp_type != 'PORT': continue if 'name' not in component: MSG = 'Malformed component. "name" missing: {:s}' raise Exception(MSG.format(str(component))) comp_name = component['name'] if comp_name.startswith('cali'): continue # calico port if comp_name.startswith('vxlan'): continue # vxlan.calico port if comp_name.startswith('docker'): continue # docker port if comp_name in {'lo', 'loop', 'loopback'}: continue # loopback port endpoint = {'uuid': comp_name, 'type': '-'} entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) return entries Loading
scripts/run_tests_locally-device-restconf-openconfig.sh 0 → 100755 +25 −0 Original line number Diff line number Diff line #!/bin/bash # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc # Run unitary tests and analyze coverage of code at same time # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 coverage run --rcfile=$RCFILE --append -m pytest --log-level=DEBUG --verbose -o log_cli=true \ device/tests/restconf_openconfig/test_unitary_restconf_openconfig.py
src/device/service/drivers/restconf_openconfig/RestConfOpenConfigDriver.py 0 → 100644 +173 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy, json, logging, re, requests, threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.tools.rest_conf.client.RestConfClient import RestConfClient from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import ( _Driver, RESOURCE_ACL, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES ) from .handlers.ComponentsHandler import ComponentsHandler from .handlers.AclRuleSetHandler import AclRuleSetHandler ALL_RESOURCE_KEYS = [ RESOURCE_ACL, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, ] RE_ACL_RULESET = re.compile( r'^\/device\[([^\]]+)\]\/endpoint\[([^\]]+)\]\/acl\_ruleset\[([^\]]+)\]$' ) def parse_resource_key(resource_key : str) -> Optional[Tuple[str, str, str]]: re_match_acl_ruleset = RE_ACL_RULESET.match(resource_key) if re_match_acl_ruleset is None: return None device_key, endpoint_key, acl_ruleset_name = re_match_acl_ruleset.groups() return device_key, endpoint_key, acl_ruleset_name DRIVER_NAME = 'restconf_openconfig' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) class RestConfOpenConfigDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) logger_prefix = '{:s}:[{:s}:{:s}]'.format( str(__name__), str(self.address), str(self.port) ) self.__logger = logging.getLogger(logger_prefix) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() restconf_settings = copy.deepcopy(settings) restconf_settings['logger'] = logging.getLogger(logger_prefix + '.RestConfClient_v1') self._rest_conf_client = RestConfClient(address, port=port, **restconf_settings) self._handler_components = ComponentsHandler(self._rest_conf_client) self._handler_acl_ruleset = AclRuleSetHandler(self._rest_conf_client) def Connect(self) -> bool: with self.__lock: if self.__started.is_set(): return True try: self._rest_conf_client._discover_base_url() except requests.exceptions.Timeout: self.__logger.exception('Timeout exception checking connectivity') return False except Exception: # pylint: disable=broad-except self.__logger.exception('Unhandled exception checking connectivity') return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() if not self.__started.is_set(): return True return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) results = list() with self.__lock: if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) if resource_key == RESOURCE_ENDPOINTS: results.extend(self._handler_components.get()) elif resource_key == RESOURCE_ACL: results.extend(self._handler_acl_ruleset.get()) else: parts = parse_resource_key(resource_key) if parts is None: continue device_key, endpoint_key, acl_ruleset_name = parts results.extend(self._handler_acl_ruleset.get(acl_ruleset_name=acl_ruleset_name)) except Exception as e: MSG = 'Error processing resource_key({:s}, {:s})' self.__logger.exception(MSG.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) # if processing fails, store the exception return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] with self.__lock: for resource_key, resource_value in resources: self.__logger.info('resource: key({:s}) => value({:s})'.format(str(resource_key), str(resource_value))) try: if isinstance(resource_value, str): resource_value = json.loads(resource_value) if parse_resource_key(resource_key) is None: continue results.append(self._handler_acl_ruleset.update(resource_value)) except Exception as e: results.append(e) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] with self.__lock: for resource_key, resource_value in resources: self.__logger.info('resource: key({:s}) => value({:s})'.format(str(resource_key), str(resource_value))) try: #if isinstance(resource_value, str): resource_value = json.loads(resource_value) resource_key_parts = parse_resource_key(resource_key) if resource_key_parts is None: continue _, _, acl_ruleset_name = resource_key_parts results.append(self._handler_acl_ruleset.delete(acl_ruleset_name)) except Exception as e: results.append(e) return results @metered_subclass_method(METRICS_POOL) def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate : Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # TODO: RESTCONF OPENCONFIG does not support monitoring by now return []
src/device/service/drivers/restconf_openconfig/__init__.py 0 → 100644 +14 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
src/device/service/drivers/restconf_openconfig/handlers/AclRuleSetHandler.py 0 → 100644 +248 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Any, Dict, List, Optional, Tuple, Union from common.proto.context_pb2 import AclDirectionEnum from common.tools.rest_conf.client.RestConfClient import RestConfClient LOGGER = logging.getLogger(__name__) _TFS_2_OC_RULE_TYPE = { 'ACLRULETYPE_IPV4': 'ACL_IPV4', 'ACLRULETYPE_IPV6': 'ACL_IPV6', } _OC_2_TFS_RULE_TYPE = {v: k for k, v in _TFS_2_OC_RULE_TYPE.items() } _TFS_2_OC_PROTOCOL = { 1 : 'IP_ICMP', 6 : 'IP_TCP', 17 : 'IP_UDP', } _OC_2_TFS_PROTOCOL = {v: k for k, v in _TFS_2_OC_PROTOCOL.items() } _TFS_2_OC_FWD_ACTION = { 'ACLFORWARDINGACTION_DROP' : 'DROP', 'ACLFORWARDINGACTION_ACCEPT': 'ACCEPT', 'ACLFORWARDINGACTION_REJECT': 'REJECT', } _OC_2_TFS_FWD_ACTION = {v: k for k, v in _TFS_2_OC_FWD_ACTION.items()} DIRECTION_INGRESS = { AclDirectionEnum.ACLDIRECTION_BOTH, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_BOTH), AclDirectionEnum.ACLDIRECTION_INGRESS, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_INGRESS), } DIRECTION_EGRESS = { AclDirectionEnum.ACLDIRECTION_BOTH, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_BOTH), AclDirectionEnum.ACLDIRECTION_EGRESS, AclDirectionEnum.Name(AclDirectionEnum.ACLDIRECTION_EGRESS), } class AclRuleSetHandler: def __init__(self, rest_conf_client : RestConfClient) -> None: self._rest_conf_client = rest_conf_client self._subpath_root = '/openconfig-acl:acl' self._subpath_item = self._subpath_root + '/acl-sets/acl-set={acl_ruleset_name:s}' def get(self, acl_ruleset_name : Optional[str] = None) -> Union[Dict, List]: if acl_ruleset_name is None: subpath_url = self._subpath_root else: subpath_url = self._subpath_item.format(acl_ruleset_name=acl_ruleset_name) reply = self._rest_conf_client.get(subpath_url) if 'openconfig-acl:acl' not in reply: raise Exception('Malformed reply. "openconfig-acl:acl" missing') acls = reply['openconfig-acl:acl'] if 'acl-sets' not in acls: raise Exception('Malformed reply. "openconfig-acl:acl/acl-sets" missing') aclsets = acls['acl-sets'] if 'acl-set' not in aclsets: raise Exception('Malformed reply. "openconfig-acl:acl/acl-sets/acl-set" missing') aclset_lst = aclsets['acl-set'] if len(aclset_lst) == 0: MSG = '[get] No ACL-Sets are reported' LOGGER.debug(MSG) return list() results : List[Tuple[str, Dict[str, Any]]] = list() for acl_set in aclset_lst: acl_set_name = acl_set['name'] oc_acl_set_type = acl_set['config']['type'] tfs_acl_set_type = _OC_2_TFS_RULE_TYPE[oc_acl_set_type] rule_set: Dict[str, Any] = { 'name' : acl_set_name, 'type' : tfs_acl_set_type, 'entries' : [], } acl_set_config : Dict = acl_set.get('config', {}) acl_set_description = acl_set_config.get('description') if acl_set_description is not None: rule_set['description'] = acl_set_description for ace in acl_set.get('acl-entries', {}).get('acl-entry', []): seq = ace['sequence-id'] ipv4_cfg = ace.get('ipv4', {}).get('config', {}) match = dict() if 'source-address' in ipv4_cfg: match['src_address'] = ipv4_cfg['source-address'] if 'destination-address' in ipv4_cfg: match['dst_address'] = ipv4_cfg['destination-address'] if 'protocol' in ipv4_cfg: match['protocol'] = _OC_2_TFS_PROTOCOL[ipv4_cfg['protocol']] transp_cfg = ace.get('transport', {}).get('config', {}) if 'source-port' in transp_cfg: match['src_port'] = transp_cfg['source-port'] if 'destination-port' in transp_cfg: match['dst_port'] = transp_cfg['destination-port'] act = ace.get('actions', {}).get('config', {}).get('forwarding-action', 'DROP') fwd_tfs = _OC_2_TFS_FWD_ACTION[act] rule_set['entries'].append({ 'sequence_id': seq, 'match': match, 'action': {'forward_action': fwd_tfs}, }) # find where that ACL is bound (first matching interface) if_name = '' for intf in acls.get('interfaces', {}).get('interface', []): for ing in intf.get('ingress-acl-sets', {}).get('ingress-acl-set', []): if ing['set-name'] == acl_set_name: if_name = intf['id'] break path = '/device[]/endpoint[{:s}]/acl_ruleset[{:s}]'.format( if_name, acl_set_name ) tfs_acl_data = { 'endpoint_id': {'endpoint_uuid': {'uuid': if_name}}, 'direction': 'ACLDIRECTION_INGRESS', 'rule_set': rule_set, } results.append((path, tfs_acl_data)) return results def update(self, acl_data : Dict) -> bool: if_name = acl_data['endpoint_id']['endpoint_uuid']['uuid'] direction = acl_data['direction'] rule_set = acl_data['rule_set'] if direction in DIRECTION_INGRESS: acl_set_name = 'ip-filter-input' elif direction in DIRECTION_EGRESS: acl_set_name = 'ip-filter-output' else: MSG = 'Unsupported direction: {:s}' raise Exception(MSG.format(str(direction))) acl_entry_desc = rule_set['name'] acl_set_type = _TFS_2_OC_RULE_TYPE[rule_set['type']] oc_acl_entries = list() sequence_ids : List[int] = list() for entry in rule_set.get('entries', []): sequence_id = int(entry['sequence_id']) oc_action = _TFS_2_OC_FWD_ACTION[entry['action']['forward_action']] oc_acl_entry = { 'sequence-id': sequence_id, 'config': {'sequence-id': sequence_id, 'description' : acl_entry_desc}, 'actions': {'config': {'forwarding-action': oc_action}} } entry_match = entry.get('match', dict()) ipv4_config = dict() if 'protocol' in entry_match: ipv4_config['protocol'] = _TFS_2_OC_PROTOCOL[entry_match['protocol']] if 'src_address' in entry_match: ipv4_config['source-address'] = entry_match['src_address'] if 'dst_address' in entry_match: ipv4_config['destination-address'] = entry_match['dst_address'] if len(ipv4_config) > 0: oc_acl_entry.setdefault('ipv4', dict())['config'] = ipv4_config transport_config = dict() if 'src_port' in entry_match: transport_config['source-port'] = entry_match['src_port'] if 'dst_port' in entry_match: transport_config['destination-port'] = entry_match['dst_port'] if len(transport_config) > 0: oc_acl_entry.setdefault('transport', dict())['config'] = transport_config oc_acl_entries.append(oc_acl_entry) sequence_ids.append(sequence_id) oc_interface = { 'id': if_name, 'config': {'id': if_name}, 'interface-ref': {'config': {'interface': if_name, 'subinterface': 1}}, } if direction in DIRECTION_INGRESS: oc_interface['ingress-acl-sets'] = {'ingress-acl-set': [{ 'set-name': acl_set_name, 'type': acl_set_type, 'config': {'set-name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': [ {'sequence-id': sequence_id} for sequence_id in sequence_ids ]} }]} if direction in DIRECTION_EGRESS: oc_interface['egress-acl-sets'] = {'egress-acl-set': [{ 'set-name': acl_set_name, 'type': acl_set_type, 'config': {'set-name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': [ {'sequence-id': sequence_id} for sequence_id in sequence_ids ]} }]} oc_acl_data = {'openconfig-acl:acl': { 'acl-sets': {'acl-set': [{ 'name': acl_set_name, 'type': acl_set_type, 'config': {'name': acl_set_name, 'type': acl_set_type}, 'acl-entries': {'acl-entry': oc_acl_entries}, }]}, 'interfaces': {'interface': [oc_interface]}, }} return self._rest_conf_client.post(self._subpath_root, body=oc_acl_data) is not None def delete(self, acl_ruleset_name : str) -> bool: if acl_ruleset_name is None: raise Exception('acl_ruleset_name is None') subpath_url = self._subpath_item.format(acl_ruleset_name=acl_ruleset_name) return self._rest_conf_client.delete(subpath_url)
src/device/service/drivers/restconf_openconfig/handlers/ComponentsHandler.py 0 → 100644 +71 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Dict, List, Tuple from common.tools.rest_conf.client.RestConfClient import RestConfClient LOGGER = logging.getLogger(__name__) class ComponentsHandler: def __init__(self, rest_conf_client : RestConfClient) -> None: self._rest_conf_client = rest_conf_client self._subpath_root = '/openconfig-platform:components' def get(self) -> List[Tuple[str, Dict]]: reply = self._rest_conf_client.get(self._subpath_root) if 'openconfig-platform:components' not in reply: raise Exception('Malformed reply. "openconfig-platform:components" missing') components = reply['openconfig-platform:components'] if 'component' not in components: raise Exception('Malformed reply. "openconfig-platform:components/component" missing') component_lst = components['component'] if len(component_lst) == 0: MSG = '[get] No components are reported' LOGGER.debug(MSG) return list() entries : List[Tuple[str, Dict]] = list() for component in component_lst: if 'state' not in component: MSG = 'Malformed component. "state" missing: {:s}' raise Exception(MSG.format(str(component))) comp_state = component['state'] if 'type' not in comp_state: MSG = 'Malformed component. "state/type" missing: {:s}' raise Exception(MSG.format(str(component))) comp_type : str = comp_state['type'] comp_type = comp_type.split(':')[-1] if comp_type != 'PORT': continue if 'name' not in component: MSG = 'Malformed component. "name" missing: {:s}' raise Exception(MSG.format(str(component))) comp_name = component['name'] if comp_name.startswith('cali'): continue # calico port if comp_name.startswith('vxlan'): continue # vxlan.calico port if comp_name.startswith('docker'): continue # docker port if comp_name in {'lo', 'loop', 'loopback'}: continue # loopback port endpoint = {'uuid': comp_name, 'type': '-'} entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) return entries