Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +3 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,9 @@ class Chain: rules : List[Rule] = field(default_factory=list) def add_rule(self, entry : Dict) -> None: self.rules.append(Rule.from_nft_entry(self.family, self.table, self.chain, entry)) rule = Rule.from_nft_entry(self.family, self.table, self.chain, entry) if rule is None: return self.rules.append(rule) def to_openconfig(self) -> Tuple[Optional[Dict], Dict]: acl_set_name = f'{self.family.value}-{self.table.value}-{self.chain}' Loading src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/NFTablesParserTools.py +11 −1 Original line number Diff line number Diff line Loading @@ -38,20 +38,24 @@ def parse_nft_ip_addr(right : Union[str, Dict]) -> ipaddress.IPv4Interface: return ipaddress.IPv4Interface(f'{address}/{str(prefix_len)}') def parse_nft_match(rule : 'Rule', match : Dict) -> None: def parse_nft_match(rule : 'Rule', match : Dict) -> int: if 'op' not in match: raise MissingFieldException('rule.expr.match.op', match) if 'left' not in match: raise MissingFieldException('rule.expr.match.left', match) if 'right' not in match: raise MissingFieldException('rule.expr.match.right', match) if match['op'] != '==': raise UnsupportedElementException('rule.expr.match.op', match) num_fields_updated = 0 match_left = match['left'] match_right = match['right'] if 'meta' in match_left and 'key' in match_left['meta']: meta_key = match_left['meta']['key'] if 'iifname' in meta_key: rule.input_if_name = match_right num_fields_updated += 1 elif 'oifname' in meta_key: rule.output_if_name = match_right num_fields_updated += 1 else: raise UnsupportedElementException('rule.expr.match', match) elif 'payload' in match_left: Loading @@ -61,17 +65,23 @@ def parse_nft_match(rule : 'Rule', match : Dict) -> None: field_name = payload['field'] if protocol == 'ip' and field_name == 'saddr': rule.src_ip_addr = parse_nft_ip_addr(match_right) num_fields_updated += 1 elif protocol == 'ip' and field_name == 'daddr': rule.dst_ip_addr = parse_nft_ip_addr(match_right) num_fields_updated += 1 elif protocol in {'tcp', 'udp'} and field_name == 'sport': rule.ip_protocol = get_protocol_from_str(protocol) rule.src_port = match_right num_fields_updated += 1 elif protocol in {'tcp', 'udp'} and field_name == 'dport': rule.ip_protocol = get_protocol_from_str(protocol) rule.dst_port = match_right num_fields_updated += 1 else: raise UnsupportedElementException('rule.expr.match', match) else: raise UnsupportedElementException('rule.expr.match', match) else: raise UnsupportedElementException('rule.expr.match', match) return num_fields_updated No newline at end of file src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Rule.py +7 −1 Original line number Diff line number Diff line Loading @@ -57,19 +57,25 @@ class Rule: if 'expr' not in entry: raise MissingFieldException('rule.expr', entry) expr_list : List[Dict] = entry.pop('expr') num_fields_updated = 0 for expr_entry in expr_list: expr_entry_fields = set(expr_entry.keys()) expr_entry_type = expr_entry_fields.pop() if expr_entry_type == 'match': match = expr_entry['match'] parse_nft_match(rule, match) num_fields_updated += parse_nft_match(rule, match) elif expr_entry_type in {'accept', 'drop', 'reject'}: rule.action = get_action_from_str(expr_entry_type) num_fields_updated += 1 elif expr_entry_type in {'counter'}: pass # ignore these entry types else: raise UnsupportedElementException('expr_entry', expr_entry) if len(num_fields_updated) == 0: # Ignoring empty/unsupported rule... return None rule.comment = entry.pop('comment', None) rule.handle = entry['handle'] return rule Loading Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Chain.py +3 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,9 @@ class Chain: rules : List[Rule] = field(default_factory=list) def add_rule(self, entry : Dict) -> None: self.rules.append(Rule.from_nft_entry(self.family, self.table, self.chain, entry)) rule = Rule.from_nft_entry(self.family, self.table, self.chain, entry) if rule is None: return self.rules.append(rule) def to_openconfig(self) -> Tuple[Optional[Dict], Dict]: acl_set_name = f'{self.family.value}-{self.table.value}-{self.chain}' Loading
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/NFTablesParserTools.py +11 −1 Original line number Diff line number Diff line Loading @@ -38,20 +38,24 @@ def parse_nft_ip_addr(right : Union[str, Dict]) -> ipaddress.IPv4Interface: return ipaddress.IPv4Interface(f'{address}/{str(prefix_len)}') def parse_nft_match(rule : 'Rule', match : Dict) -> None: def parse_nft_match(rule : 'Rule', match : Dict) -> int: if 'op' not in match: raise MissingFieldException('rule.expr.match.op', match) if 'left' not in match: raise MissingFieldException('rule.expr.match.left', match) if 'right' not in match: raise MissingFieldException('rule.expr.match.right', match) if match['op'] != '==': raise UnsupportedElementException('rule.expr.match.op', match) num_fields_updated = 0 match_left = match['left'] match_right = match['right'] if 'meta' in match_left and 'key' in match_left['meta']: meta_key = match_left['meta']['key'] if 'iifname' in meta_key: rule.input_if_name = match_right num_fields_updated += 1 elif 'oifname' in meta_key: rule.output_if_name = match_right num_fields_updated += 1 else: raise UnsupportedElementException('rule.expr.match', match) elif 'payload' in match_left: Loading @@ -61,17 +65,23 @@ def parse_nft_match(rule : 'Rule', match : Dict) -> None: field_name = payload['field'] if protocol == 'ip' and field_name == 'saddr': rule.src_ip_addr = parse_nft_ip_addr(match_right) num_fields_updated += 1 elif protocol == 'ip' and field_name == 'daddr': rule.dst_ip_addr = parse_nft_ip_addr(match_right) num_fields_updated += 1 elif protocol in {'tcp', 'udp'} and field_name == 'sport': rule.ip_protocol = get_protocol_from_str(protocol) rule.src_port = match_right num_fields_updated += 1 elif protocol in {'tcp', 'udp'} and field_name == 'dport': rule.ip_protocol = get_protocol_from_str(protocol) rule.dst_port = match_right num_fields_updated += 1 else: raise UnsupportedElementException('rule.expr.match', match) else: raise UnsupportedElementException('rule.expr.match', match) else: raise UnsupportedElementException('rule.expr.match', match) return num_fields_updated No newline at end of file
src/tests/tools/firewall_agent/firewall_agent/resources/nft_model/Rule.py +7 −1 Original line number Diff line number Diff line Loading @@ -57,19 +57,25 @@ class Rule: if 'expr' not in entry: raise MissingFieldException('rule.expr', entry) expr_list : List[Dict] = entry.pop('expr') num_fields_updated = 0 for expr_entry in expr_list: expr_entry_fields = set(expr_entry.keys()) expr_entry_type = expr_entry_fields.pop() if expr_entry_type == 'match': match = expr_entry['match'] parse_nft_match(rule, match) num_fields_updated += parse_nft_match(rule, match) elif expr_entry_type in {'accept', 'drop', 'reject'}: rule.action = get_action_from_str(expr_entry_type) num_fields_updated += 1 elif expr_entry_type in {'counter'}: pass # ignore these entry types else: raise UnsupportedElementException('expr_entry', expr_entry) if len(num_fields_updated) == 0: # Ignoring empty/unsupported rule... return None rule.comment = entry.pop('comment', None) rule.handle = entry['handle'] return rule Loading