Loading src/tests/tools/firewall_agent/firewall_agent/resources/ACLs.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ class ACLs(Resource): abort(400, message='invalid interfaces') nft = NFTables() nft.load(skip_rules=True) arid = AclRuleToInterfaceDirection(nft) arid.add_interfaces(interfaces) Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +40 −1 Original line number Diff line number Diff line Loading @@ -13,22 +13,61 @@ # limitations under the License. import enum from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Tuple from .ActionEnum import ActionEnum from .DirectionEnum import DirectionEnum from .FamilyEnum import FamilyEnum from .TableEnum import TableEnum from .Rule import Rule class ChainPriorityEnum(enum.IntEnum): RAW = -300 MANGLE = -150 FILTER = 0 @dataclass class Chain: family : FamilyEnum table : TableEnum chain : str handle : Optional[int ] = None type : Optional[str ] = None hook : Optional[str ] = None prio : Optional[int ] = None policy : Optional[ActionEnum] = None rules : List[Rule] = field(default_factory=list) @classmethod def from_manual( cls, family : FamilyEnum, table : TableEnum, name : str, handle : Optional[int] = None, type_ : Optional[str] = None, hook : Optional[str] = None, prio : int = ChainPriorityEnum.RAW.value, policy : ActionEnum = ActionEnum.ACCEPT ) -> 'Chain': chain : 'Chain' = cls(family, table, name) chain.handle = handle if type_ is None: type_ = str(table.value).lower() if hook is None: hook = str(name).lower() chain.prio = prio chain.policy = policy return chain @classmethod def from_nft_entry( cls, family : FamilyEnum, table : TableEnum, entry : Dict ) -> 'Chain': name : str = entry['name'] chain : 'Chain' = cls(family, table, name) chain.handle = entry['handle'] chain.type = entry.get('type', table.value.lower()) chain.hook = entry.get('hook', name.lower()) chain.prio = entry.get('prio', ChainPriorityEnum.RAW.value) chain.policy = entry.get('policy', ActionEnum.ACCEPT.value) return chain def add_rule(self, entry : Dict) -> None: rule = Rule.from_nft_entry(self.family, self.table, self.chain, entry) if rule is None: return Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/NFTables.py +4 −3 Original line number Diff line number Diff line Loading @@ -35,15 +35,15 @@ class NFTables: def load( self, family : Optional[FamilyEnum] = None, table : Optional[TableEnum] = None, chain : Optional[str] = None chain : Optional[str] = None, skip_rules : bool = False ) -> None: entries = NFTablesCommand.list(family=family, table=table, chain=chain) for entry in entries: self.parse_entry(entry) for entry in entries: self.parse_entry(entry, skip_rules=skip_rules) def get_or_create_table(self, family : FamilyEnum, table : TableEnum) -> Table: return self.tables.setdefault((family, table), Table(family, table)) def parse_entry(self, entry : Dict) -> None: def parse_entry(self, entry : Dict, skip_rules : bool = False) -> None: entry_fields = set(entry.keys()) if len(entry_fields) != 1: raise UnsupportedElementException('entry', entry) entry_type = entry_fields.pop() Loading @@ -54,6 +54,7 @@ class NFTables: elif entry_type in {'chain'}: self.parse_entry_chain(entry['chain']) elif entry_type in {'rule'}: if skip_rules: return self.parse_entry_rule(entry['rule']) else: raise UnsupportedElementException('entry', entry) Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Table.py +10 −6 Original line number Diff line number Diff line Loading @@ -28,20 +28,24 @@ class Table: handle : Optional[int] = None chains : Dict[str, Chain] = field(default_factory=dict) def get_chain(self, name : str) -> Chain: return self.chains[name] def get_or_create_chain(self, name : str) -> Chain: return self.chains.setdefault(name, Chain(self.family, self.table, name)) return self.chains.setdefault(name, Chain.from_manual(self.family, self.table, name)) def add_chain_by_entry(self, entry : Dict) -> Chain: name : str = entry.pop('name') name : str = entry['name'] if name.lower() not in {'input', 'output', 'forward', 'prerouting'}: return None chain_obj = self.get_or_create_chain(name) chain_obj.handle = entry['handle'] return chain_obj if name in self.chains: return self.chains[name] chain = Chain.from_nft_entry(self.family, self.table, entry) self.chains[name] = chain return chain def add_rule_by_entry(self, entry : Dict) -> None: chain : str = entry.pop('chain') if chain.lower() not in {'input', 'output', 'forward', 'prerouting'}: return self.get_or_create_chain(chain).add_rule(entry) self.get_chain(chain).add_rule(entry) def to_openconfig(self) -> Tuple[List[Dict], Dict]: acl_sets : List[Dict] = list() Loading Loading
src/tests/tools/firewall_agent/firewall_agent/resources/ACLs.py +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ class ACLs(Resource): abort(400, message='invalid interfaces') nft = NFTables() nft.load(skip_rules=True) arid = AclRuleToInterfaceDirection(nft) arid.add_interfaces(interfaces) Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +40 −1 Original line number Diff line number Diff line Loading @@ -13,22 +13,61 @@ # limitations under the License. import enum from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Tuple from .ActionEnum import ActionEnum from .DirectionEnum import DirectionEnum from .FamilyEnum import FamilyEnum from .TableEnum import TableEnum from .Rule import Rule class ChainPriorityEnum(enum.IntEnum): RAW = -300 MANGLE = -150 FILTER = 0 @dataclass class Chain: family : FamilyEnum table : TableEnum chain : str handle : Optional[int ] = None type : Optional[str ] = None hook : Optional[str ] = None prio : Optional[int ] = None policy : Optional[ActionEnum] = None rules : List[Rule] = field(default_factory=list) @classmethod def from_manual( cls, family : FamilyEnum, table : TableEnum, name : str, handle : Optional[int] = None, type_ : Optional[str] = None, hook : Optional[str] = None, prio : int = ChainPriorityEnum.RAW.value, policy : ActionEnum = ActionEnum.ACCEPT ) -> 'Chain': chain : 'Chain' = cls(family, table, name) chain.handle = handle if type_ is None: type_ = str(table.value).lower() if hook is None: hook = str(name).lower() chain.prio = prio chain.policy = policy return chain @classmethod def from_nft_entry( cls, family : FamilyEnum, table : TableEnum, entry : Dict ) -> 'Chain': name : str = entry['name'] chain : 'Chain' = cls(family, table, name) chain.handle = entry['handle'] chain.type = entry.get('type', table.value.lower()) chain.hook = entry.get('hook', name.lower()) chain.prio = entry.get('prio', ChainPriorityEnum.RAW.value) chain.policy = entry.get('policy', ActionEnum.ACCEPT.value) return chain def add_rule(self, entry : Dict) -> None: rule = Rule.from_nft_entry(self.family, self.table, self.chain, entry) if rule is None: return Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/NFTables.py +4 −3 Original line number Diff line number Diff line Loading @@ -35,15 +35,15 @@ class NFTables: def load( self, family : Optional[FamilyEnum] = None, table : Optional[TableEnum] = None, chain : Optional[str] = None chain : Optional[str] = None, skip_rules : bool = False ) -> None: entries = NFTablesCommand.list(family=family, table=table, chain=chain) for entry in entries: self.parse_entry(entry) for entry in entries: self.parse_entry(entry, skip_rules=skip_rules) def get_or_create_table(self, family : FamilyEnum, table : TableEnum) -> Table: return self.tables.setdefault((family, table), Table(family, table)) def parse_entry(self, entry : Dict) -> None: def parse_entry(self, entry : Dict, skip_rules : bool = False) -> None: entry_fields = set(entry.keys()) if len(entry_fields) != 1: raise UnsupportedElementException('entry', entry) entry_type = entry_fields.pop() Loading @@ -54,6 +54,7 @@ class NFTables: elif entry_type in {'chain'}: self.parse_entry_chain(entry['chain']) elif entry_type in {'rule'}: if skip_rules: return self.parse_entry_rule(entry['rule']) else: raise UnsupportedElementException('entry', entry) Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Table.py +10 −6 Original line number Diff line number Diff line Loading @@ -28,20 +28,24 @@ class Table: handle : Optional[int] = None chains : Dict[str, Chain] = field(default_factory=dict) def get_chain(self, name : str) -> Chain: return self.chains[name] def get_or_create_chain(self, name : str) -> Chain: return self.chains.setdefault(name, Chain(self.family, self.table, name)) return self.chains.setdefault(name, Chain.from_manual(self.family, self.table, name)) def add_chain_by_entry(self, entry : Dict) -> Chain: name : str = entry.pop('name') name : str = entry['name'] if name.lower() not in {'input', 'output', 'forward', 'prerouting'}: return None chain_obj = self.get_or_create_chain(name) chain_obj.handle = entry['handle'] return chain_obj if name in self.chains: return self.chains[name] chain = Chain.from_nft_entry(self.family, self.table, entry) self.chains[name] = chain return chain def add_rule_by_entry(self, entry : Dict) -> None: chain : str = entry.pop('chain') if chain.lower() not in {'input', 'output', 'forward', 'prerouting'}: return self.get_or_create_chain(chain).add_rule(entry) self.get_chain(chain).add_rule(entry) def to_openconfig(self) -> Tuple[List[Dict], Dict]: acl_sets : List[Dict] = list() Loading