Loading src/tests/tools/firewall_agent/docs/Docs-and-Commands.md +27 −2 Original line number Diff line number Diff line Loading @@ -9,9 +9,34 @@ ## Example Commands: ```bash sudo nft --interactive --handle # WORKS to block traffic: sudo nft insert rule ip filter FORWARD iifname "enp0s3" tcp dport 85 drop # WORKS to block traffic, but weird as external facing port is 30435, not 85 insert rule ip filter FORWARD iifname "enp0s3" tcp dport 85 drop # WORKS to block/allow traffic by external facing port 30435 add table ip filter add chain ip filter PREROUTING { type filter hook prerouting priority raw; policy accept; } add rule ip filter PREROUTING tcp dport 30435 reject insert rule ip filter PREROUTING ip saddr 10.0.2.2/32 tcp dport 30435 accept insert rule ip filter PREROUTING ip saddr 10.0.2.10/32 tcp dport 30435 accept list chain ip filter PREROUTING table ip filter { chain PREROUTING { # handle 30 type filter hook prerouting priority raw; policy accept; ip saddr 10.0.2.10 tcp dport 30435 accept # handle 34 ip saddr 10.0.2.2 tcp dport 30435 accept # handle 33 tcp dport 30435 reject # handle 31 } } delete rule ip filter PREROUTING handle 34 delete rule ip filter PREROUTING handle 33 delete rule ip filter PREROUTING handle 31 # ============================= sudo nft add table ip filter sudo nft add chain ip filter input {type filter hook input priority filter ; policy accept; } Loading src/tests/tools/firewall_agent/firewall_agent/resources/ACLs.py +5 −66 Original line number Diff line number Diff line Loading @@ -16,12 +16,16 @@ import logging from flask import request from flask_restful import Api, Resource, abort from typing import Dict, List, Set, Tuple from typing import Set, Tuple from .nft_model.DirectionEnum import DirectionEnum from .nft_model.FamilyEnum import FamilyEnum from .nft_model.NFTables import NFTables from .nft_model.Rule import Rule from .nft_model.TableEnum import TableEnum from .AclRuleToInterfaceDirection import ( CHAINS_INPUT, CHAINS_OUTPUT, AclRuleToInterfaceDirection, get_family_from_acl_set_type ) LOGGER = logging.getLogger(__name__) Loading @@ -30,71 +34,6 @@ LOGGER = logging.getLogger(__name__) BASE_URL_ROOT = '/restconf/data/openconfig-acl:acl' BASE_URL_ITEM = '/restconf/data/openconfig-acl:acl/acl-sets/acl-set=<name>' CHAIN_NAME_INPUT = 'INPUT' CHAIN_NAME_FORWARD = 'FORWARD' CHAIN_NAME_OUTPUT = 'OUTPUT' CHAINS_INPUT = [CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD] CHAINS_OUTPUT = [CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT] CHAINS_ALL = [CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT] TYPE_ACL_RULE_SEQ_ID = Tuple[str, int] TYPE_IFACE_DIRECTION = Tuple[str, DirectionEnum] TYPE_IFACE_DIRECTIONS = List[TYPE_IFACE_DIRECTION] TYPE_ACL_RULE_TO_IF_DIR = Dict[TYPE_ACL_RULE_SEQ_ID, TYPE_IFACE_DIRECTIONS] def get_family_from_acl_set_type(acl_set_type : str) -> FamilyEnum: return { 'ACL_IPV4' : FamilyEnum.IPV4, 'ACL_IPV6' : FamilyEnum.IPV6, }[acl_set_type] class AclRuleToInterfaceDirection: def __init__(self, nft : NFTables): self._nft = nft self._acl_rule_to_iface_direction : TYPE_ACL_RULE_TO_IF_DIR = dict() def create_nft_chains_in_table(self, acl_set_type : str, chain_names : List[str]) -> None: family = get_family_from_acl_set_type(acl_set_type) table = self._nft.get_or_create_table(family, TableEnum.FILTER) for chain_name in chain_names: table.get_or_create_chain(chain_name) def add_acl_set(self, if_name : str, acl_set : Dict, direction : DirectionEnum) -> None: acl_set_name = acl_set['config']['set-name'] acl_set_type = acl_set['config']['type'] if direction == DirectionEnum.INGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_INPUT) elif direction == DirectionEnum.EGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_OUTPUT) else: self.create_nft_chains_in_table(acl_set_type, CHAINS_ALL) for acl_set_entry in acl_set['acl-entries']['acl-entry']: sequence_id = int(acl_set_entry['sequence-id']) key = (acl_set_name, sequence_id) if_dir_list = self._acl_rule_to_iface_direction.setdefault(key, list()) if_dir_list.append((if_name, direction)) def add_interface(self, interface : Dict) -> None: if_name = interface['config']['id'] for direction in [DirectionEnum.INGRESS, DirectionEnum.EGRESS]: direction_value = direction.value acl_sets_obj = interface.get(f'{direction_value}-acl-sets', dict()) acl_sets_lst = acl_sets_obj.get(f'{direction_value}-acl-set', list()) for acl_set in acl_sets_lst: self.add_acl_set(if_name, acl_set, DirectionEnum.INGRESS) def add_interfaces(self, interfaces : List[Dict]) -> None: for interface in interfaces: self.add_interface(interface) def get_interfaces_directions( self, acl_set_name : str, sequence_id : int ) -> TYPE_IFACE_DIRECTIONS: return self._acl_rule_to_iface_direction.get((acl_set_name, sequence_id), []) class ACLs(Resource): def get(self): Loading src/tests/tools/firewall_agent/firewall_agent/resources/AclRuleToInterfaceDirection.py 0 → 100644 +98 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, List, Tuple from .nft_model.DirectionEnum import DirectionEnum from .nft_model.FamilyEnum import FamilyEnum from .nft_model.NFTables import NFTables from .nft_model.TableEnum import TableEnum TYPE_ACL_RULE_SEQ_ID = Tuple[str, int] TYPE_IFACE_DIRECTION = Tuple[str, DirectionEnum] TYPE_IFACE_DIRECTIONS = List[TYPE_IFACE_DIRECTION] TYPE_ACL_RULE_TO_IF_DIR = Dict[TYPE_ACL_RULE_SEQ_ID, TYPE_IFACE_DIRECTIONS] CHAIN_NAME_PREROUTING = 'PREROUTING' CHAIN_NAME_INPUT = 'INPUT' CHAIN_NAME_FORWARD = 'FORWARD' CHAIN_NAME_OUTPUT = 'OUTPUT' CHAIN_NAME_POSTROUTING = 'POSTROUTING' CHAINS_INPUT = [ CHAIN_NAME_PREROUTING, CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD ] CHAINS_OUTPUT = [ CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT, CHAIN_NAME_POSTROUTING ] CHAINS_ALL = [ CHAIN_NAME_PREROUTING, CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT, CHAIN_NAME_POSTROUTING ] def get_family_from_acl_set_type(acl_set_type : str) -> FamilyEnum: return { 'ACL_IPV4' : FamilyEnum.IPV4, 'ACL_IPV6' : FamilyEnum.IPV6, }[acl_set_type] class AclRuleToInterfaceDirection: def __init__(self, nft : NFTables): self._nft = nft self._acl_rule_to_iface_direction : TYPE_ACL_RULE_TO_IF_DIR = dict() def create_nft_chains_in_table(self, acl_set_type : str, chain_names : List[str]) -> None: family = get_family_from_acl_set_type(acl_set_type) table = self._nft.get_or_create_table(family, TableEnum.FILTER) for chain_name in chain_names: table.get_or_create_chain(chain_name) def add_acl_set(self, if_name : str, acl_set : Dict, direction : DirectionEnum) -> None: acl_set_name = acl_set['config']['set-name'] acl_set_type = acl_set['config']['type'] if direction == DirectionEnum.INGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_INPUT) elif direction == DirectionEnum.EGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_OUTPUT) else: self.create_nft_chains_in_table(acl_set_type, CHAINS_ALL) for acl_set_entry in acl_set['acl-entries']['acl-entry']: sequence_id = int(acl_set_entry['sequence-id']) key = (acl_set_name, sequence_id) if_dir_list = self._acl_rule_to_iface_direction.setdefault(key, list()) if_dir_list.append((if_name, direction)) def add_interface(self, interface : Dict) -> None: if_name = interface['config']['id'] for direction in [DirectionEnum.INGRESS, DirectionEnum.EGRESS]: direction_value = direction.value acl_sets_obj = interface.get(f'{direction_value}-acl-sets', dict()) acl_sets_lst = acl_sets_obj.get(f'{direction_value}-acl-set', list()) for acl_set in acl_sets_lst: self.add_acl_set(if_name, acl_set, DirectionEnum.INGRESS) def add_interfaces(self, interfaces : List[Dict]) -> None: for interface in interfaces: self.add_interface(interface) def get_interfaces_directions( self, acl_set_name : str, sequence_id : int ) -> TYPE_IFACE_DIRECTIONS: return self._acl_rule_to_iface_direction.get((acl_set_name, sequence_id), []) src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +2 −1 Original line number Diff line number Diff line Loading @@ -81,10 +81,11 @@ class Chain: # chains and their removal might cause side effects on NFTables. pass else: #chain_hook = str(self.chain).lower() parts = [ 'add', 'chain', self.family.value, self.table.value, self.chain, '{', 'type', self.table.value, 'hook', self.chain, 'priority', self.table.value, ';', 'type', self.table.value, 'hook', self.chain, 'priority', 'raw', ';', 'policy', 'accept', ';', '}' ] Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Rule.py +3 −1 Original line number Diff line number Diff line Loading @@ -225,7 +225,9 @@ class Rule: 'handle', str(self.handle) ] else: verb = 'add' if self.sequence_id < 1000 else 'insert' # NOTE: if sequence_id < 10000: insert the rules to the top; # otherwise, append to the bottom. Anyways, sort rules by sequence_id. verb = 'insert' if self.sequence_id < 10000 else 'add' parts = [verb, 'rule', self.family.value, self.table.value, self.chain] if self.input_if_name is not None: parts.extend(['iifname', self.input_if_name]) if self.output_if_name is not None: parts.extend(['oifname', self.output_if_name]) Loading Loading
src/tests/tools/firewall_agent/docs/Docs-and-Commands.md +27 −2 Original line number Diff line number Diff line Loading @@ -9,9 +9,34 @@ ## Example Commands: ```bash sudo nft --interactive --handle # WORKS to block traffic: sudo nft insert rule ip filter FORWARD iifname "enp0s3" tcp dport 85 drop # WORKS to block traffic, but weird as external facing port is 30435, not 85 insert rule ip filter FORWARD iifname "enp0s3" tcp dport 85 drop # WORKS to block/allow traffic by external facing port 30435 add table ip filter add chain ip filter PREROUTING { type filter hook prerouting priority raw; policy accept; } add rule ip filter PREROUTING tcp dport 30435 reject insert rule ip filter PREROUTING ip saddr 10.0.2.2/32 tcp dport 30435 accept insert rule ip filter PREROUTING ip saddr 10.0.2.10/32 tcp dport 30435 accept list chain ip filter PREROUTING table ip filter { chain PREROUTING { # handle 30 type filter hook prerouting priority raw; policy accept; ip saddr 10.0.2.10 tcp dport 30435 accept # handle 34 ip saddr 10.0.2.2 tcp dport 30435 accept # handle 33 tcp dport 30435 reject # handle 31 } } delete rule ip filter PREROUTING handle 34 delete rule ip filter PREROUTING handle 33 delete rule ip filter PREROUTING handle 31 # ============================= sudo nft add table ip filter sudo nft add chain ip filter input {type filter hook input priority filter ; policy accept; } Loading
src/tests/tools/firewall_agent/firewall_agent/resources/ACLs.py +5 −66 Original line number Diff line number Diff line Loading @@ -16,12 +16,16 @@ import logging from flask import request from flask_restful import Api, Resource, abort from typing import Dict, List, Set, Tuple from typing import Set, Tuple from .nft_model.DirectionEnum import DirectionEnum from .nft_model.FamilyEnum import FamilyEnum from .nft_model.NFTables import NFTables from .nft_model.Rule import Rule from .nft_model.TableEnum import TableEnum from .AclRuleToInterfaceDirection import ( CHAINS_INPUT, CHAINS_OUTPUT, AclRuleToInterfaceDirection, get_family_from_acl_set_type ) LOGGER = logging.getLogger(__name__) Loading @@ -30,71 +34,6 @@ LOGGER = logging.getLogger(__name__) BASE_URL_ROOT = '/restconf/data/openconfig-acl:acl' BASE_URL_ITEM = '/restconf/data/openconfig-acl:acl/acl-sets/acl-set=<name>' CHAIN_NAME_INPUT = 'INPUT' CHAIN_NAME_FORWARD = 'FORWARD' CHAIN_NAME_OUTPUT = 'OUTPUT' CHAINS_INPUT = [CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD] CHAINS_OUTPUT = [CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT] CHAINS_ALL = [CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT] TYPE_ACL_RULE_SEQ_ID = Tuple[str, int] TYPE_IFACE_DIRECTION = Tuple[str, DirectionEnum] TYPE_IFACE_DIRECTIONS = List[TYPE_IFACE_DIRECTION] TYPE_ACL_RULE_TO_IF_DIR = Dict[TYPE_ACL_RULE_SEQ_ID, TYPE_IFACE_DIRECTIONS] def get_family_from_acl_set_type(acl_set_type : str) -> FamilyEnum: return { 'ACL_IPV4' : FamilyEnum.IPV4, 'ACL_IPV6' : FamilyEnum.IPV6, }[acl_set_type] class AclRuleToInterfaceDirection: def __init__(self, nft : NFTables): self._nft = nft self._acl_rule_to_iface_direction : TYPE_ACL_RULE_TO_IF_DIR = dict() def create_nft_chains_in_table(self, acl_set_type : str, chain_names : List[str]) -> None: family = get_family_from_acl_set_type(acl_set_type) table = self._nft.get_or_create_table(family, TableEnum.FILTER) for chain_name in chain_names: table.get_or_create_chain(chain_name) def add_acl_set(self, if_name : str, acl_set : Dict, direction : DirectionEnum) -> None: acl_set_name = acl_set['config']['set-name'] acl_set_type = acl_set['config']['type'] if direction == DirectionEnum.INGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_INPUT) elif direction == DirectionEnum.EGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_OUTPUT) else: self.create_nft_chains_in_table(acl_set_type, CHAINS_ALL) for acl_set_entry in acl_set['acl-entries']['acl-entry']: sequence_id = int(acl_set_entry['sequence-id']) key = (acl_set_name, sequence_id) if_dir_list = self._acl_rule_to_iface_direction.setdefault(key, list()) if_dir_list.append((if_name, direction)) def add_interface(self, interface : Dict) -> None: if_name = interface['config']['id'] for direction in [DirectionEnum.INGRESS, DirectionEnum.EGRESS]: direction_value = direction.value acl_sets_obj = interface.get(f'{direction_value}-acl-sets', dict()) acl_sets_lst = acl_sets_obj.get(f'{direction_value}-acl-set', list()) for acl_set in acl_sets_lst: self.add_acl_set(if_name, acl_set, DirectionEnum.INGRESS) def add_interfaces(self, interfaces : List[Dict]) -> None: for interface in interfaces: self.add_interface(interface) def get_interfaces_directions( self, acl_set_name : str, sequence_id : int ) -> TYPE_IFACE_DIRECTIONS: return self._acl_rule_to_iface_direction.get((acl_set_name, sequence_id), []) class ACLs(Resource): def get(self): Loading
src/tests/tools/firewall_agent/firewall_agent/resources/AclRuleToInterfaceDirection.py 0 → 100644 +98 −0 Original line number Diff line number Diff line # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, List, Tuple from .nft_model.DirectionEnum import DirectionEnum from .nft_model.FamilyEnum import FamilyEnum from .nft_model.NFTables import NFTables from .nft_model.TableEnum import TableEnum TYPE_ACL_RULE_SEQ_ID = Tuple[str, int] TYPE_IFACE_DIRECTION = Tuple[str, DirectionEnum] TYPE_IFACE_DIRECTIONS = List[TYPE_IFACE_DIRECTION] TYPE_ACL_RULE_TO_IF_DIR = Dict[TYPE_ACL_RULE_SEQ_ID, TYPE_IFACE_DIRECTIONS] CHAIN_NAME_PREROUTING = 'PREROUTING' CHAIN_NAME_INPUT = 'INPUT' CHAIN_NAME_FORWARD = 'FORWARD' CHAIN_NAME_OUTPUT = 'OUTPUT' CHAIN_NAME_POSTROUTING = 'POSTROUTING' CHAINS_INPUT = [ CHAIN_NAME_PREROUTING, CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD ] CHAINS_OUTPUT = [ CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT, CHAIN_NAME_POSTROUTING ] CHAINS_ALL = [ CHAIN_NAME_PREROUTING, CHAIN_NAME_INPUT, CHAIN_NAME_FORWARD, CHAIN_NAME_OUTPUT, CHAIN_NAME_POSTROUTING ] def get_family_from_acl_set_type(acl_set_type : str) -> FamilyEnum: return { 'ACL_IPV4' : FamilyEnum.IPV4, 'ACL_IPV6' : FamilyEnum.IPV6, }[acl_set_type] class AclRuleToInterfaceDirection: def __init__(self, nft : NFTables): self._nft = nft self._acl_rule_to_iface_direction : TYPE_ACL_RULE_TO_IF_DIR = dict() def create_nft_chains_in_table(self, acl_set_type : str, chain_names : List[str]) -> None: family = get_family_from_acl_set_type(acl_set_type) table = self._nft.get_or_create_table(family, TableEnum.FILTER) for chain_name in chain_names: table.get_or_create_chain(chain_name) def add_acl_set(self, if_name : str, acl_set : Dict, direction : DirectionEnum) -> None: acl_set_name = acl_set['config']['set-name'] acl_set_type = acl_set['config']['type'] if direction == DirectionEnum.INGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_INPUT) elif direction == DirectionEnum.EGRESS: self.create_nft_chains_in_table(acl_set_type, CHAINS_OUTPUT) else: self.create_nft_chains_in_table(acl_set_type, CHAINS_ALL) for acl_set_entry in acl_set['acl-entries']['acl-entry']: sequence_id = int(acl_set_entry['sequence-id']) key = (acl_set_name, sequence_id) if_dir_list = self._acl_rule_to_iface_direction.setdefault(key, list()) if_dir_list.append((if_name, direction)) def add_interface(self, interface : Dict) -> None: if_name = interface['config']['id'] for direction in [DirectionEnum.INGRESS, DirectionEnum.EGRESS]: direction_value = direction.value acl_sets_obj = interface.get(f'{direction_value}-acl-sets', dict()) acl_sets_lst = acl_sets_obj.get(f'{direction_value}-acl-set', list()) for acl_set in acl_sets_lst: self.add_acl_set(if_name, acl_set, DirectionEnum.INGRESS) def add_interfaces(self, interfaces : List[Dict]) -> None: for interface in interfaces: self.add_interface(interface) def get_interfaces_directions( self, acl_set_name : str, sequence_id : int ) -> TYPE_IFACE_DIRECTIONS: return self._acl_rule_to_iface_direction.get((acl_set_name, sequence_id), [])
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +2 −1 Original line number Diff line number Diff line Loading @@ -81,10 +81,11 @@ class Chain: # chains and their removal might cause side effects on NFTables. pass else: #chain_hook = str(self.chain).lower() parts = [ 'add', 'chain', self.family.value, self.table.value, self.chain, '{', 'type', self.table.value, 'hook', self.chain, 'priority', self.table.value, ';', 'type', self.table.value, 'hook', self.chain, 'priority', 'raw', ';', 'policy', 'accept', ';', '}' ] Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Rule.py +3 −1 Original line number Diff line number Diff line Loading @@ -225,7 +225,9 @@ class Rule: 'handle', str(self.handle) ] else: verb = 'add' if self.sequence_id < 1000 else 'insert' # NOTE: if sequence_id < 10000: insert the rules to the top; # otherwise, append to the bottom. Anyways, sort rules by sequence_id. verb = 'insert' if self.sequence_id < 10000 else 'add' parts = [verb, 'rule', self.family.value, self.table.value, self.chain] if self.input_if_name is not None: parts.extend(['iifname', self.input_if_name]) if self.output_if_name is not None: parts.extend(['oifname', self.output_if_name]) Loading