Loading src/nbi/service/ietf_acl/Acl.py +2 −2 Original line number Diff line number Diff line Loading @@ -64,8 +64,8 @@ class Acl(Resource): _config_rule.CopyFrom(config_rule) _config_rule.action = ConfigActionEnum.CONFIGACTION_DELETE delete_config_rules.append(_config_rule) if len(delete_config_rules) == 0: break else: raise NotFound('Acl({:s}) not found in Device({:s})'.format(str(acl_name), str(device_uuid))) device_client = DeviceClient() Loading src/nbi/service/ietf_acl/ietf_acl_parser.py +105 −89 Original line number Diff line number Diff line Loading @@ -13,104 +13,126 @@ # limitations under the License. from enum import Enum from typing import List, Dict from typing import List, Dict, Optional from pydantic import BaseModel, Field from werkzeug.exceptions import NotImplemented from common.proto.acl_pb2 import AclForwardActionEnum, AclRuleTypeEnum, AclEntry from common.proto.context_pb2 import ConfigActionEnum, ConfigRule class AclDirectionEnum(Enum): INGRESS = 'ingress' EGRESS = 'egress' class Ipv4(BaseModel): dscp: int = 0 source_ipv4_network: str = Field(serialization_alias="source-ipv4-network", default="") destination_ipv4_network: str = Field(serialization_alias="destination-ipv4-network", default="") source_ipv4_network: str = Field(serialization_alias='source-ipv4-network', default='') destination_ipv4_network: str = Field( serialization_alias='destination-ipv4-network', default='' ) class Port(BaseModel): port: int = 0 operator: str = "eq" operator: str = 'eq' class Tcp(BaseModel): flags: str = "" source_port: Port = Field(serialization_alias="source-port", default_factory=lambda: Port()) destination_port: Port = Field(serialization_alias="destination-port", default_factory=lambda: Port()) flags: str = '' source_port: Port = Field(serialization_alias='source-port', default_factory=lambda: Port()) destination_port: Port = Field( serialization_alias='destination-port', default_factory=lambda: Port() ) class Matches(BaseModel): ipv4: Ipv4 = Ipv4() tcp: Tcp = Tcp() tcp: Optional[Tcp] = None class Action(BaseModel): forwarding: str = "" forwarding: str = '' class Ace(BaseModel): name: str = "custom_rule" name: str = '' matches: Matches = Matches() actions: Action = Action() class Aces(BaseModel): ace: List[Ace] = [Ace()] class Acl(BaseModel): name: str = "" type: str = "" name: str = '' type: str = '' aces: Aces = Aces() class Name(BaseModel): name: str = "" name: str = '' class AclSet(BaseModel): acl_set: List[Name] = Field(serialization_alias="acl-set", default=[Name()]) acl_set: List[Name] = Field(serialization_alias='acl-set', default=[Name()]) class AclSets(BaseModel): acl_sets: AclSet = Field(serialization_alias="acl-sets", default=AclSet()) acl_sets: AclSet = Field(serialization_alias='acl-sets', default=AclSet()) class Ingress(BaseModel): ingress: AclSets = AclSets() class Egress(BaseModel): egress: AclSets = AclSets() class Interface(BaseModel): interface_id: str = Field(serialization_alias="interface-id", default="") ingress : Ingress = Ingress() egress : Egress = Egress() interface_id: str = Field(serialization_alias='interface-id', default='') ingress: Optional[AclSets] = None egress: Optional[AclSets] = None class Interfaces(BaseModel): interface: List[Interface] = [Interface()] class AttachmentPoints(BaseModel): attachment_points: Interfaces = Field(serialization_alias="attachment-points", default=Interfaces()) class Acls(BaseModel): acl: List[Acl] = [Acl()] attachment_points: AttachmentPoints = Field(serialization_alias="attachment-points", default=AttachmentPoints()) attachment_points: Optional[Interfaces] = Field( serialization_alias='attachment-points', default=None ) class IETF_ACL(BaseModel): acls: Acls = Acls() acls: Optional[Acls] = Field(serialization_alias='ietf-access-control-list:acls', default=None) IETF_TFS_RULE_TYPE_MAPPING = { "ipv4-acl-type": "ACLRULETYPE_IPV4", "ipv6-acl-type": "ACLRULETYPE_IPV6", 'ipv4-acl-type': 'ACLRULETYPE_IPV4', 'ipv6-acl-type': 'ACLRULETYPE_IPV6', } IETF_TFS_FORWARDING_ACTION_MAPPING = { "accept": "ACLFORWARDINGACTION_ACCEPT", "drop" : "ACLFORWARDINGACTION_DROP", 'accept': 'ACLFORWARDINGACTION_ACCEPT', 'drop': 'ACLFORWARDINGACTION_DROP', } TFS_IETF_RULE_TYPE_MAPPING = { "ACLRULETYPE_IPV4": "ipv4-acl-type", "ACLRULETYPE_IPV6": "ipv6-acl-type", 'ACLRULETYPE_IPV4': 'ipv4-acl-type', 'ACLRULETYPE_IPV6': 'ipv6-acl-type', } TFS_IETF_FORWARDING_ACTION_MAPPING = { "ACLFORWARDINGACTION_ACCEPT": "accept", "ACLFORWARDINGACTION_DROP" : "drop", 'ACLFORWARDINGACTION_ACCEPT': 'accept', 'ACLFORWARDINGACTION_DROP': 'drop', } def config_rule_from_ietf_acl( device_name: str, endpoint_name: str, acl_set_data: Dict ) -> ConfigRule: Loading Loading @@ -139,7 +161,7 @@ def config_rule_from_ietf_acl( acl_entry = AclEntry() acl_entry.sequence_id = sequence_id + 1 #acl_entry.description = ... acl_entry.description = ace_name if 'ipv4' in ace_matches: ipv4_data = ace_matches['ipv4'] Loading Loading @@ -203,55 +225,49 @@ def config_rule_from_ietf_acl( return acl_config_rule def ietf_acl_from_config_rule_resource_value(config_rule_rv: Dict) -> Dict: rule_set = config_rule_rv['rule_set'] acl_entry = rule_set['entries'][0] match_ = acl_entry['match'] ace = [] for acl_entry in rule_set['entries']: match_ = acl_entry['match'] ipv4 = Ipv4( dscp=match_["dscp"], source_ipv4_network=match_["src_address"], destination_ipv4_network=match_["dst_address"] dscp=match_['dscp'], source_ipv4_network=match_['src_address'], destination_ipv4_network=match_['dst_address'], ) tcp = None if match_['tcp_flags']: tcp = Tcp( flags=match_["tcp_flags"], source_port=Port(port=match_["src_port"]), destination_port=Port(port=match_["dst_port"]) flags=match_['tcp_flags'], source_port=Port(port=match_['src_port']), destination_port=Port(port=match_['dst_port']), ) matches = Matches(ipvr=ipv4, tcp=tcp) aces = Aces(ace=[ matches = Matches(ipv4=ipv4, tcp=tcp) ace.append( Ace( name=acl_entry['description'], matches=matches, actions=Action( forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[acl_entry["action"]["forward_action"]] ) ) ]) acl = Acl( name=rule_set["name"], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set["type"]], aces=aces ) acl_sets = AclSets( acl_sets=AclSet( acl_set=[ Name(name=rule_set["name"]) forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[ acl_entry['action']['forward_action'] ] ), ) ) ingress = Ingress(ingress=acl_sets) interfaces = Interfaces(interface=[ aces = Aces(ace=ace) acl = Acl(name=rule_set['name'], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set['type']], aces=aces) acl_sets = AclSets(acl_sets=AclSet(acl_set=[Name(name=rule_set['name'])])) interfaces = Interfaces( interface=[ Interface( interface_id=config_rule_rv["interface"], ingress=ingress ) ]) acls = Acls( acl=[acl], attachment_points=AttachmentPoints( attachment_points=interfaces interface_id=config_rule_rv['endpoint_id']['endpoint_uuid']['uuid'], ingress=acl_sets, ) ] ) acls = Acls(acl=[acl], attachment_points=interfaces) ietf_acl = IETF_ACL(acls=acls) return ietf_acl.model_dump(by_alias=True) return ietf_acl.model_dump(by_alias=True, exclude_none=True) Loading
src/nbi/service/ietf_acl/Acl.py +2 −2 Original line number Diff line number Diff line Loading @@ -64,8 +64,8 @@ class Acl(Resource): _config_rule.CopyFrom(config_rule) _config_rule.action = ConfigActionEnum.CONFIGACTION_DELETE delete_config_rules.append(_config_rule) if len(delete_config_rules) == 0: break else: raise NotFound('Acl({:s}) not found in Device({:s})'.format(str(acl_name), str(device_uuid))) device_client = DeviceClient() Loading
src/nbi/service/ietf_acl/ietf_acl_parser.py +105 −89 Original line number Diff line number Diff line Loading @@ -13,104 +13,126 @@ # limitations under the License. from enum import Enum from typing import List, Dict from typing import List, Dict, Optional from pydantic import BaseModel, Field from werkzeug.exceptions import NotImplemented from common.proto.acl_pb2 import AclForwardActionEnum, AclRuleTypeEnum, AclEntry from common.proto.context_pb2 import ConfigActionEnum, ConfigRule class AclDirectionEnum(Enum): INGRESS = 'ingress' EGRESS = 'egress' class Ipv4(BaseModel): dscp: int = 0 source_ipv4_network: str = Field(serialization_alias="source-ipv4-network", default="") destination_ipv4_network: str = Field(serialization_alias="destination-ipv4-network", default="") source_ipv4_network: str = Field(serialization_alias='source-ipv4-network', default='') destination_ipv4_network: str = Field( serialization_alias='destination-ipv4-network', default='' ) class Port(BaseModel): port: int = 0 operator: str = "eq" operator: str = 'eq' class Tcp(BaseModel): flags: str = "" source_port: Port = Field(serialization_alias="source-port", default_factory=lambda: Port()) destination_port: Port = Field(serialization_alias="destination-port", default_factory=lambda: Port()) flags: str = '' source_port: Port = Field(serialization_alias='source-port', default_factory=lambda: Port()) destination_port: Port = Field( serialization_alias='destination-port', default_factory=lambda: Port() ) class Matches(BaseModel): ipv4: Ipv4 = Ipv4() tcp: Tcp = Tcp() tcp: Optional[Tcp] = None class Action(BaseModel): forwarding: str = "" forwarding: str = '' class Ace(BaseModel): name: str = "custom_rule" name: str = '' matches: Matches = Matches() actions: Action = Action() class Aces(BaseModel): ace: List[Ace] = [Ace()] class Acl(BaseModel): name: str = "" type: str = "" name: str = '' type: str = '' aces: Aces = Aces() class Name(BaseModel): name: str = "" name: str = '' class AclSet(BaseModel): acl_set: List[Name] = Field(serialization_alias="acl-set", default=[Name()]) acl_set: List[Name] = Field(serialization_alias='acl-set', default=[Name()]) class AclSets(BaseModel): acl_sets: AclSet = Field(serialization_alias="acl-sets", default=AclSet()) acl_sets: AclSet = Field(serialization_alias='acl-sets', default=AclSet()) class Ingress(BaseModel): ingress: AclSets = AclSets() class Egress(BaseModel): egress: AclSets = AclSets() class Interface(BaseModel): interface_id: str = Field(serialization_alias="interface-id", default="") ingress : Ingress = Ingress() egress : Egress = Egress() interface_id: str = Field(serialization_alias='interface-id', default='') ingress: Optional[AclSets] = None egress: Optional[AclSets] = None class Interfaces(BaseModel): interface: List[Interface] = [Interface()] class AttachmentPoints(BaseModel): attachment_points: Interfaces = Field(serialization_alias="attachment-points", default=Interfaces()) class Acls(BaseModel): acl: List[Acl] = [Acl()] attachment_points: AttachmentPoints = Field(serialization_alias="attachment-points", default=AttachmentPoints()) attachment_points: Optional[Interfaces] = Field( serialization_alias='attachment-points', default=None ) class IETF_ACL(BaseModel): acls: Acls = Acls() acls: Optional[Acls] = Field(serialization_alias='ietf-access-control-list:acls', default=None) IETF_TFS_RULE_TYPE_MAPPING = { "ipv4-acl-type": "ACLRULETYPE_IPV4", "ipv6-acl-type": "ACLRULETYPE_IPV6", 'ipv4-acl-type': 'ACLRULETYPE_IPV4', 'ipv6-acl-type': 'ACLRULETYPE_IPV6', } IETF_TFS_FORWARDING_ACTION_MAPPING = { "accept": "ACLFORWARDINGACTION_ACCEPT", "drop" : "ACLFORWARDINGACTION_DROP", 'accept': 'ACLFORWARDINGACTION_ACCEPT', 'drop': 'ACLFORWARDINGACTION_DROP', } TFS_IETF_RULE_TYPE_MAPPING = { "ACLRULETYPE_IPV4": "ipv4-acl-type", "ACLRULETYPE_IPV6": "ipv6-acl-type", 'ACLRULETYPE_IPV4': 'ipv4-acl-type', 'ACLRULETYPE_IPV6': 'ipv6-acl-type', } TFS_IETF_FORWARDING_ACTION_MAPPING = { "ACLFORWARDINGACTION_ACCEPT": "accept", "ACLFORWARDINGACTION_DROP" : "drop", 'ACLFORWARDINGACTION_ACCEPT': 'accept', 'ACLFORWARDINGACTION_DROP': 'drop', } def config_rule_from_ietf_acl( device_name: str, endpoint_name: str, acl_set_data: Dict ) -> ConfigRule: Loading Loading @@ -139,7 +161,7 @@ def config_rule_from_ietf_acl( acl_entry = AclEntry() acl_entry.sequence_id = sequence_id + 1 #acl_entry.description = ... acl_entry.description = ace_name if 'ipv4' in ace_matches: ipv4_data = ace_matches['ipv4'] Loading Loading @@ -203,55 +225,49 @@ def config_rule_from_ietf_acl( return acl_config_rule def ietf_acl_from_config_rule_resource_value(config_rule_rv: Dict) -> Dict: rule_set = config_rule_rv['rule_set'] acl_entry = rule_set['entries'][0] match_ = acl_entry['match'] ace = [] for acl_entry in rule_set['entries']: match_ = acl_entry['match'] ipv4 = Ipv4( dscp=match_["dscp"], source_ipv4_network=match_["src_address"], destination_ipv4_network=match_["dst_address"] dscp=match_['dscp'], source_ipv4_network=match_['src_address'], destination_ipv4_network=match_['dst_address'], ) tcp = None if match_['tcp_flags']: tcp = Tcp( flags=match_["tcp_flags"], source_port=Port(port=match_["src_port"]), destination_port=Port(port=match_["dst_port"]) flags=match_['tcp_flags'], source_port=Port(port=match_['src_port']), destination_port=Port(port=match_['dst_port']), ) matches = Matches(ipvr=ipv4, tcp=tcp) aces = Aces(ace=[ matches = Matches(ipv4=ipv4, tcp=tcp) ace.append( Ace( name=acl_entry['description'], matches=matches, actions=Action( forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[acl_entry["action"]["forward_action"]] ) ) ]) acl = Acl( name=rule_set["name"], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set["type"]], aces=aces ) acl_sets = AclSets( acl_sets=AclSet( acl_set=[ Name(name=rule_set["name"]) forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[ acl_entry['action']['forward_action'] ] ), ) ) ingress = Ingress(ingress=acl_sets) interfaces = Interfaces(interface=[ aces = Aces(ace=ace) acl = Acl(name=rule_set['name'], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set['type']], aces=aces) acl_sets = AclSets(acl_sets=AclSet(acl_set=[Name(name=rule_set['name'])])) interfaces = Interfaces( interface=[ Interface( interface_id=config_rule_rv["interface"], ingress=ingress ) ]) acls = Acls( acl=[acl], attachment_points=AttachmentPoints( attachment_points=interfaces interface_id=config_rule_rv['endpoint_id']['endpoint_uuid']['uuid'], ingress=acl_sets, ) ] ) acls = Acls(acl=[acl], attachment_points=interfaces) ietf_acl = IETF_ACL(acls=acls) return ietf_acl.model_dump(by_alias=True) return ietf_acl.model_dump(by_alias=True, exclude_none=True)