Loading src/device/service/drivers/gnmi_openconfig/handlers/Acl.py 0 → 100644 +140 −0 Original line number Diff line number Diff line import json import logging from typing import Any, Dict, List, Tuple import libyang from ._Handler import _Handler from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) # ────────────────────────── enum translations ────────────────────────── _TFS_OC_RULE_TYPE = { 'ACLRULETYPE_IPV4': 'ACL_IPV4', 'ACLRULETYPE_IPV6': 'ACL_IPV6', } _TFS_OC_FWD_ACTION = { 'ACLFORWARDINGACTION_DROP': 'DROP', 'ACLFORWARDINGACTION_ACCEPT': 'ACCEPT', 'ACLFORWARDINGACTION_REJECT': 'REJECT', } _OC_TFS_RULE_TYPE = {v: k for k, v in _TFS_OC_RULE_TYPE.items()} _OC_TFS_FWD_ACTION = {v: k for k, v in _TFS_OC_FWD_ACTION.items()} # ───────────────────────────────────────────────────────────────────────── class AclHandler(_Handler): def get_resource_key(self) -> str: return '/device/endpoint/acl_ruleset' def get_path(self) -> str: return '/openconfig-acl:acl' def compose( # pylint: disable=too-many-locals self, resource_key: str, resource_value: Dict[str, Any], yang: YangHandler, delete: bool = False, ) -> Tuple[str, str]: rs = resource_value['rule_set'] rs_name = rs['name'] oc_type = _TFS_OC_RULE_TYPE[rs['type']] device = resource_value['endpoint_id']['device_id']['device_uuid']['uuid'] iface = resource_value['endpoint_id']['endpoint_uuid']['uuid'] if delete: path = f'/acl/acl-sets/acl-set[name={rs_name}][type={oc_type}]' return path, '' yang_acl: libyang.DContainer = yang.get_data_path('/openconfig-acl:acl') y_sets = yang_acl.create_path('acl-sets') y_set = y_sets.create_path(f'acl-set[name="{rs_name}"][type="{oc_type}"]') y_set.create_path('config/name', rs_name) y_set.create_path('config/type', oc_type) # Entries (ACEs) y_entries = y_set.create_path('acl-entries') for entry in rs.get('entries', []): seq = int(entry['sequence_id']) src = entry['match'].get('src_address', '0.0.0.0/0') dst = entry['match'].get('dst_address', '0.0.0.0/0') act = _TFS_OC_FWD_ACTION[entry['action']['forward_action']] y_e = y_entries.create_path(f'acl-entry[sequence-id="{seq}"]') y_e.create_path('config/sequence-id', seq) y_ipv4 = y_e.create_path('ipv4') y_ipv4.create_path('config/source-address', src) y_ipv4.create_path('config/destination-address', dst) y_act = y_e.create_path('actions') y_act.create_path('config/forwarding-action', act) # Interface binding y_intfs = yang_acl.create_path('interfaces') y_intf = y_intfs.create_path(f'interface[id="{iface}"]') y_ing = y_intf.create_path('ingress-acl-sets') y_ing_set = y_ing.create_path(f'ingress-acl-set[set-name="{rs_name}"][type="{oc_type}"]') y_ing_set.create_path('config/set-name', rs_name) y_ing_set.create_path('config/type', oc_type) json_data = yang_acl.print_mem('json') LOGGER.debug('JSON data: %s', json_data) json_obj = json.loads(json_data)['openconfig-acl:acl'] return '/acl', json.dumps(json_obj) def parse( # pylint: disable=too-many-locals self, json_data: Dict[str, Any], yang: YangHandler, ) -> List[Tuple[str, Dict[str, Any]]]: acl_tree = json_data.get('openconfig-acl:acl') or json_data results: List[Tuple[str, Dict[str, Any]]] = [] for acl_set in acl_tree.get('acl-sets', {}).get('acl-set', []): rs_name = acl_set['name'] oc_type = acl_set['config']['type'] rs_type = _OC_TFS_RULE_TYPE[oc_type] rule_set: Dict[str, Any] = { 'name': rs_name, 'type': rs_type, 'description': acl_set.get('config', {}).get('description', ''), 'entries': [], } for ace in acl_set.get('acl-entries', {}).get('acl-entry', []): seq = ace['sequence-id'] act = ace.get('actions', {}).get('config', {}).get('forwarding-action', 'DROP') fwd_tfs = _OC_TFS_FWD_ACTION[act] ipv4_cfg = ace.get('ipv4', {}).get('config', {}) rule_set['entries'].append( { 'sequence_id': seq, 'match': { 'src_address': ipv4_cfg.get('source-address', ''), 'dst_address': ipv4_cfg.get('destination-address', ''), }, 'action': {'forward_action': fwd_tfs}, } ) # find where that ACL is bound (first matching interface) iface = '' for intf in acl_tree.get('interfaces', {}).get('interface', []): for ing in intf.get('ingress-acl-sets', {}).get('ingress-acl-set', []): if ing['set-name'] == rs_name: iface = intf['id'] break results.append(('/acl', {'interface': iface, 'rule_set': rule_set})) return results src/device/service/drivers/gnmi_openconfig/handlers/YangHandler.py +2 −0 Original line number Diff line number Diff line Loading @@ -44,6 +44,7 @@ YANG_MODULES = [ 'openconfig-mpls-types', 'openconfig-network-instance-types', 'openconfig-network-instance', 'openconfig-acl', 'openconfig-platform', 'openconfig-platform-controller-card', Loading @@ -59,6 +60,7 @@ YANG_MODULES = [ 'openconfig-platform-software', 'openconfig-platform-transceiver', 'openconfig-platform-types', 'openconfig-platform-healthz', ] LOGGER = logging.getLogger(__name__) Loading src/device/service/drivers/gnmi_openconfig/handlers/__init__.py +8 −1 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ import logging from typing import Any, Dict, List, Optional, Tuple, Union from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ACL from ._Handler import _Handler from .Component import ComponentHandler from .Interface import InterfaceHandler Loading @@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler from .NetworkInstanceProtocol import NetworkInstanceProtocolHandler from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler from .Acl import AclHandler from .Tools import get_schema from .YangHandler import YangHandler Loading @@ -35,17 +36,20 @@ nih = NetworkInstanceHandler() niifh = NetworkInstanceInterfaceHandler() niph = NetworkInstanceProtocolHandler() nisrh = NetworkInstanceStaticRouteHandler() aclh = AclHandler() ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ACL, ] RESOURCE_KEY_MAPPER = { RESOURCE_ENDPOINTS : comph.get_resource_key(), RESOURCE_INTERFACES : ifaceh.get_resource_key(), RESOURCE_NETWORK_INSTANCES : nih.get_resource_key(), RESOURCE_ACL : aclh.get_resource_key(), } PATH_MAPPER = { Loading @@ -53,6 +57,7 @@ PATH_MAPPER = { '/components/component' : comph.get_path(), '/interfaces' : ifaceh.get_path(), '/network-instances' : nih.get_path(), '/acl' : aclh.get_path(), } RESOURCE_KEY_TO_HANDLER = { Loading @@ -63,6 +68,7 @@ RESOURCE_KEY_TO_HANDLER = { niifh.get_resource_key() : niifh, niph.get_resource_key() : niph, nisrh.get_resource_key() : nisrh, aclh.get_resource_key() : aclh, } PATH_TO_HANDLER = { Loading @@ -73,6 +79,7 @@ PATH_TO_HANDLER = { niifh.get_path() : niifh, niph.get_path() : niph, nisrh.get_path() : nisrh, aclh.get_path() : aclh, } def get_handler( Loading Loading
src/device/service/drivers/gnmi_openconfig/handlers/Acl.py 0 → 100644 +140 −0 Original line number Diff line number Diff line import json import logging from typing import Any, Dict, List, Tuple import libyang from ._Handler import _Handler from .YangHandler import YangHandler LOGGER = logging.getLogger(__name__) # ────────────────────────── enum translations ────────────────────────── _TFS_OC_RULE_TYPE = { 'ACLRULETYPE_IPV4': 'ACL_IPV4', 'ACLRULETYPE_IPV6': 'ACL_IPV6', } _TFS_OC_FWD_ACTION = { 'ACLFORWARDINGACTION_DROP': 'DROP', 'ACLFORWARDINGACTION_ACCEPT': 'ACCEPT', 'ACLFORWARDINGACTION_REJECT': 'REJECT', } _OC_TFS_RULE_TYPE = {v: k for k, v in _TFS_OC_RULE_TYPE.items()} _OC_TFS_FWD_ACTION = {v: k for k, v in _TFS_OC_FWD_ACTION.items()} # ───────────────────────────────────────────────────────────────────────── class AclHandler(_Handler): def get_resource_key(self) -> str: return '/device/endpoint/acl_ruleset' def get_path(self) -> str: return '/openconfig-acl:acl' def compose( # pylint: disable=too-many-locals self, resource_key: str, resource_value: Dict[str, Any], yang: YangHandler, delete: bool = False, ) -> Tuple[str, str]: rs = resource_value['rule_set'] rs_name = rs['name'] oc_type = _TFS_OC_RULE_TYPE[rs['type']] device = resource_value['endpoint_id']['device_id']['device_uuid']['uuid'] iface = resource_value['endpoint_id']['endpoint_uuid']['uuid'] if delete: path = f'/acl/acl-sets/acl-set[name={rs_name}][type={oc_type}]' return path, '' yang_acl: libyang.DContainer = yang.get_data_path('/openconfig-acl:acl') y_sets = yang_acl.create_path('acl-sets') y_set = y_sets.create_path(f'acl-set[name="{rs_name}"][type="{oc_type}"]') y_set.create_path('config/name', rs_name) y_set.create_path('config/type', oc_type) # Entries (ACEs) y_entries = y_set.create_path('acl-entries') for entry in rs.get('entries', []): seq = int(entry['sequence_id']) src = entry['match'].get('src_address', '0.0.0.0/0') dst = entry['match'].get('dst_address', '0.0.0.0/0') act = _TFS_OC_FWD_ACTION[entry['action']['forward_action']] y_e = y_entries.create_path(f'acl-entry[sequence-id="{seq}"]') y_e.create_path('config/sequence-id', seq) y_ipv4 = y_e.create_path('ipv4') y_ipv4.create_path('config/source-address', src) y_ipv4.create_path('config/destination-address', dst) y_act = y_e.create_path('actions') y_act.create_path('config/forwarding-action', act) # Interface binding y_intfs = yang_acl.create_path('interfaces') y_intf = y_intfs.create_path(f'interface[id="{iface}"]') y_ing = y_intf.create_path('ingress-acl-sets') y_ing_set = y_ing.create_path(f'ingress-acl-set[set-name="{rs_name}"][type="{oc_type}"]') y_ing_set.create_path('config/set-name', rs_name) y_ing_set.create_path('config/type', oc_type) json_data = yang_acl.print_mem('json') LOGGER.debug('JSON data: %s', json_data) json_obj = json.loads(json_data)['openconfig-acl:acl'] return '/acl', json.dumps(json_obj) def parse( # pylint: disable=too-many-locals self, json_data: Dict[str, Any], yang: YangHandler, ) -> List[Tuple[str, Dict[str, Any]]]: acl_tree = json_data.get('openconfig-acl:acl') or json_data results: List[Tuple[str, Dict[str, Any]]] = [] for acl_set in acl_tree.get('acl-sets', {}).get('acl-set', []): rs_name = acl_set['name'] oc_type = acl_set['config']['type'] rs_type = _OC_TFS_RULE_TYPE[oc_type] rule_set: Dict[str, Any] = { 'name': rs_name, 'type': rs_type, 'description': acl_set.get('config', {}).get('description', ''), 'entries': [], } for ace in acl_set.get('acl-entries', {}).get('acl-entry', []): seq = ace['sequence-id'] act = ace.get('actions', {}).get('config', {}).get('forwarding-action', 'DROP') fwd_tfs = _OC_TFS_FWD_ACTION[act] ipv4_cfg = ace.get('ipv4', {}).get('config', {}) rule_set['entries'].append( { 'sequence_id': seq, 'match': { 'src_address': ipv4_cfg.get('source-address', ''), 'dst_address': ipv4_cfg.get('destination-address', ''), }, 'action': {'forward_action': fwd_tfs}, } ) # find where that ACL is bound (first matching interface) iface = '' for intf in acl_tree.get('interfaces', {}).get('interface', []): for ing in intf.get('ingress-acl-sets', {}).get('ingress-acl-set', []): if ing['set-name'] == rs_name: iface = intf['id'] break results.append(('/acl', {'interface': iface, 'rule_set': rule_set})) return results
src/device/service/drivers/gnmi_openconfig/handlers/YangHandler.py +2 −0 Original line number Diff line number Diff line Loading @@ -44,6 +44,7 @@ YANG_MODULES = [ 'openconfig-mpls-types', 'openconfig-network-instance-types', 'openconfig-network-instance', 'openconfig-acl', 'openconfig-platform', 'openconfig-platform-controller-card', Loading @@ -59,6 +60,7 @@ YANG_MODULES = [ 'openconfig-platform-software', 'openconfig-platform-transceiver', 'openconfig-platform-types', 'openconfig-platform-healthz', ] LOGGER = logging.getLogger(__name__) Loading
src/device/service/drivers/gnmi_openconfig/handlers/__init__.py +8 −1 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ import logging from typing import Any, Dict, List, Optional, Tuple, Union from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ACL from ._Handler import _Handler from .Component import ComponentHandler from .Interface import InterfaceHandler Loading @@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler from .NetworkInstanceProtocol import NetworkInstanceProtocolHandler from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler from .Acl import AclHandler from .Tools import get_schema from .YangHandler import YangHandler Loading @@ -35,17 +36,20 @@ nih = NetworkInstanceHandler() niifh = NetworkInstanceInterfaceHandler() niph = NetworkInstanceProtocolHandler() nisrh = NetworkInstanceStaticRouteHandler() aclh = AclHandler() ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ACL, ] RESOURCE_KEY_MAPPER = { RESOURCE_ENDPOINTS : comph.get_resource_key(), RESOURCE_INTERFACES : ifaceh.get_resource_key(), RESOURCE_NETWORK_INSTANCES : nih.get_resource_key(), RESOURCE_ACL : aclh.get_resource_key(), } PATH_MAPPER = { Loading @@ -53,6 +57,7 @@ PATH_MAPPER = { '/components/component' : comph.get_path(), '/interfaces' : ifaceh.get_path(), '/network-instances' : nih.get_path(), '/acl' : aclh.get_path(), } RESOURCE_KEY_TO_HANDLER = { Loading @@ -63,6 +68,7 @@ RESOURCE_KEY_TO_HANDLER = { niifh.get_resource_key() : niifh, niph.get_resource_key() : niph, nisrh.get_resource_key() : nisrh, aclh.get_resource_key() : aclh, } PATH_TO_HANDLER = { Loading @@ -73,6 +79,7 @@ PATH_TO_HANDLER = { niifh.get_path() : niifh, niph.get_path() : niph, nisrh.get_path() : nisrh, aclh.get_path() : aclh, } def get_handler( Loading