Skip to content
Snippets Groups Projects
Commit 6509e65a authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/282-cttc-acl-support-in-gnmi-openconfig-sbi-is-required' into 'develop'

Resolve "(CTTC) ACL support in gNMI/OpenConfig SBI is required"

See merge request !342
parents fdc6ac60 41ba7fc4
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!342Resolve "(CTTC) ACL support in gNMI/OpenConfig SBI is required"
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import Any, Dict, List, Tuple
import libyang
from ._Handler import _Handler
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
# ────────────────────────── enum translations ──────────────────────────
_TFS_OC_RULE_TYPE = {
'ACLRULETYPE_IPV4': 'ACL_IPV4',
'ACLRULETYPE_IPV6': 'ACL_IPV6',
}
_TFS_OC_FWD_ACTION = {
'ACLFORWARDINGACTION_DROP': 'DROP',
'ACLFORWARDINGACTION_ACCEPT': 'ACCEPT',
'ACLFORWARDINGACTION_REJECT': 'REJECT',
}
_OC_TFS_RULE_TYPE = {v: k for k, v in _TFS_OC_RULE_TYPE.items()}
_OC_TFS_FWD_ACTION = {v: k for k, v in _TFS_OC_FWD_ACTION.items()}
# ─────────────────────────────────────────────────────────────────────────
class AclHandler(_Handler):
def get_resource_key(self) -> str:
return '/device/endpoint/acl_ruleset'
def get_path(self) -> str:
return '/openconfig-acl:acl'
def compose( # pylint: disable=too-many-locals
self,
resource_key: str,
resource_value: Dict[str, Any],
yang: YangHandler,
delete: bool = False,
) -> Tuple[str, str]:
rs = resource_value['rule_set']
rs_name = rs['name']
oc_type = _TFS_OC_RULE_TYPE[rs['type']]
device = resource_value['endpoint_id']['device_id']['device_uuid']['uuid']
iface = resource_value['endpoint_id']['endpoint_uuid']['uuid']
if delete:
path = f'/acl/acl-sets/acl-set[name={rs_name}][type={oc_type}]'
return path, ''
yang_acl: libyang.DContainer = yang.get_data_path('/openconfig-acl:acl')
y_sets = yang_acl.create_path('acl-sets')
y_set = y_sets.create_path(f'acl-set[name="{rs_name}"][type="{oc_type}"]')
y_set.create_path('config/name', rs_name)
y_set.create_path('config/type', oc_type)
# Entries (ACEs)
y_entries = y_set.create_path('acl-entries')
for entry in rs.get('entries', []):
seq = int(entry['sequence_id'])
m_ = entry["match"]
src_address = m_.get('src_address', '0.0.0.0/0')
dst_address = m_.get('dst_address', '0.0.0.0/0')
src_port = m_.get("src_port")
dst_port = m_.get("dst_port")
act = _TFS_OC_FWD_ACTION[entry['action']['forward_action']]
y_e = y_entries.create_path(f'acl-entry[sequence-id="{seq}"]')
y_e.create_path('config/sequence-id', seq)
y_ipv4 = y_e.create_path('ipv4')
y_ipv4.create_path('config/source-address', src_address)
y_ipv4.create_path('config/destination-address', dst_address)
if src_port or dst_port:
proto = m_.get("protocol")
y_trans = y_e.create_path("transport")
if src_port:
y_trans.create_path("config/source-port", int(src_port))
if dst_port:
y_trans.create_path("config/destination-port", int(dst_port))
y_ipv4.create_path('config/protocol', int(proto))
y_act = y_e.create_path('actions')
y_act.create_path('config/forwarding-action', act)
# Interface binding
y_intfs = yang_acl.create_path('interfaces')
y_intf = y_intfs.create_path(f'interface[id="{iface}"]')
y_ing = y_intf.create_path('ingress-acl-sets')
y_ing_set = y_ing.create_path(f'ingress-acl-set[set-name="{rs_name}"][type="{oc_type}"]')
y_ing_set.create_path('config/set-name', rs_name)
y_ing_set.create_path('config/type', oc_type)
json_data = yang_acl.print_mem('json')
LOGGER.debug('JSON data: %s', json_data)
json_obj = json.loads(json_data)['openconfig-acl:acl']
return '/acl', json.dumps(json_obj)
def parse( # pylint: disable=too-many-locals
self,
json_data: Dict[str, Any],
yang: YangHandler,
) -> List[Tuple[str, Dict[str, Any]]]:
acl_tree = json_data.get('openconfig-acl:acl') or json_data
results: List[Tuple[str, Dict[str, Any]]] = []
for acl_set in acl_tree.get('acl-sets', {}).get('acl-set', []):
rs_name = acl_set['name']
oc_type = acl_set['config']['type']
rs_type = _OC_TFS_RULE_TYPE[oc_type]
rule_set: Dict[str, Any] = {
'name': rs_name,
'type': rs_type,
'description': acl_set.get('config', {}).get('description', ''),
'entries': [],
}
for ace in acl_set.get('acl-entries', {}).get('acl-entry', []):
seq = ace['sequence-id']
act = ace.get('actions', {}).get('config', {}).get('forwarding-action', 'DROP')
fwd_tfs = _OC_TFS_FWD_ACTION[act]
ipv4_cfg = ace.get('ipv4', {}).get('config', {})
rule_set['entries'].append(
{
'sequence_id': seq,
'match': {
'src_address': ipv4_cfg.get('source-address', ''),
'dst_address': ipv4_cfg.get('destination-address', ''),
},
'action': {'forward_action': fwd_tfs},
}
)
# find where that ACL is bound (first matching interface)
iface = ''
for intf in acl_tree.get('interfaces', {}).get('interface', []):
for ing in intf.get('ingress-acl-sets', {}).get('ingress-acl-set', []):
if ing['set-name'] == rs_name:
iface = intf['id']
break
results.append(('/acl', {'interface': iface, 'rule_set': rule_set}))
return results
...@@ -44,6 +44,7 @@ YANG_MODULES = [ ...@@ -44,6 +44,7 @@ YANG_MODULES = [
'openconfig-mpls-types', 'openconfig-mpls-types',
'openconfig-network-instance-types', 'openconfig-network-instance-types',
'openconfig-network-instance', 'openconfig-network-instance',
'openconfig-acl',
'openconfig-platform', 'openconfig-platform',
'openconfig-platform-controller-card', 'openconfig-platform-controller-card',
...@@ -59,6 +60,7 @@ YANG_MODULES = [ ...@@ -59,6 +60,7 @@ YANG_MODULES = [
'openconfig-platform-software', 'openconfig-platform-software',
'openconfig-platform-transceiver', 'openconfig-platform-transceiver',
'openconfig-platform-types', 'openconfig-platform-types',
'openconfig-platform-healthz',
] ]
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
import logging import logging
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ACL
from ._Handler import _Handler from ._Handler import _Handler
from .Component import ComponentHandler from .Component import ComponentHandler
from .Interface import InterfaceHandler from .Interface import InterfaceHandler
...@@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler ...@@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler
from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler
from .NetworkInstanceProtocol import NetworkInstanceProtocolHandler from .NetworkInstanceProtocol import NetworkInstanceProtocolHandler
from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler
from .Acl import AclHandler
from .Tools import get_schema from .Tools import get_schema
from .YangHandler import YangHandler from .YangHandler import YangHandler
...@@ -35,17 +36,20 @@ nih = NetworkInstanceHandler() ...@@ -35,17 +36,20 @@ nih = NetworkInstanceHandler()
niifh = NetworkInstanceInterfaceHandler() niifh = NetworkInstanceInterfaceHandler()
niph = NetworkInstanceProtocolHandler() niph = NetworkInstanceProtocolHandler()
nisrh = NetworkInstanceStaticRouteHandler() nisrh = NetworkInstanceStaticRouteHandler()
aclh = AclHandler()
ALL_RESOURCE_KEYS = [ ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS, RESOURCE_ENDPOINTS,
RESOURCE_INTERFACES, RESOURCE_INTERFACES,
RESOURCE_NETWORK_INSTANCES, RESOURCE_NETWORK_INSTANCES,
RESOURCE_ACL,
] ]
RESOURCE_KEY_MAPPER = { RESOURCE_KEY_MAPPER = {
RESOURCE_ENDPOINTS : comph.get_resource_key(), RESOURCE_ENDPOINTS : comph.get_resource_key(),
RESOURCE_INTERFACES : ifaceh.get_resource_key(), RESOURCE_INTERFACES : ifaceh.get_resource_key(),
RESOURCE_NETWORK_INSTANCES : nih.get_resource_key(), RESOURCE_NETWORK_INSTANCES : nih.get_resource_key(),
RESOURCE_ACL : aclh.get_resource_key(),
} }
PATH_MAPPER = { PATH_MAPPER = {
...@@ -53,6 +57,7 @@ PATH_MAPPER = { ...@@ -53,6 +57,7 @@ PATH_MAPPER = {
'/components/component' : comph.get_path(), '/components/component' : comph.get_path(),
'/interfaces' : ifaceh.get_path(), '/interfaces' : ifaceh.get_path(),
'/network-instances' : nih.get_path(), '/network-instances' : nih.get_path(),
'/acl' : aclh.get_path(),
} }
RESOURCE_KEY_TO_HANDLER = { RESOURCE_KEY_TO_HANDLER = {
...@@ -63,6 +68,7 @@ RESOURCE_KEY_TO_HANDLER = { ...@@ -63,6 +68,7 @@ RESOURCE_KEY_TO_HANDLER = {
niifh.get_resource_key() : niifh, niifh.get_resource_key() : niifh,
niph.get_resource_key() : niph, niph.get_resource_key() : niph,
nisrh.get_resource_key() : nisrh, nisrh.get_resource_key() : nisrh,
aclh.get_resource_key() : aclh,
} }
PATH_TO_HANDLER = { PATH_TO_HANDLER = {
...@@ -73,6 +79,7 @@ PATH_TO_HANDLER = { ...@@ -73,6 +79,7 @@ PATH_TO_HANDLER = {
niifh.get_path() : niifh, niifh.get_path() : niifh,
niph.get_path() : niph, niph.get_path() : niph,
nisrh.get_path() : nisrh, nisrh.get_path() : nisrh,
aclh.get_path() : aclh,
} }
def get_handler( def get_handler(
......
...@@ -64,8 +64,8 @@ class Acl(Resource): ...@@ -64,8 +64,8 @@ class Acl(Resource):
_config_rule.CopyFrom(config_rule) _config_rule.CopyFrom(config_rule)
_config_rule.action = ConfigActionEnum.CONFIGACTION_DELETE _config_rule.action = ConfigActionEnum.CONFIGACTION_DELETE
delete_config_rules.append(_config_rule) delete_config_rules.append(_config_rule)
break
if len(delete_config_rules) == 0: else:
raise NotFound('Acl({:s}) not found in Device({:s})'.format(str(acl_name), str(device_uuid))) raise NotFound('Acl({:s}) not found in Device({:s})'.format(str(acl_name), str(device_uuid)))
device_client = DeviceClient() device_client = DeviceClient()
......
...@@ -13,106 +13,126 @@ ...@@ -13,106 +13,126 @@
# limitations under the License. # limitations under the License.
from enum import Enum from enum import Enum
from typing import List, Dict from typing import List, Dict, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from werkzeug.exceptions import NotImplemented from werkzeug.exceptions import NotImplemented
from common.proto.acl_pb2 import AclForwardActionEnum, AclRuleTypeEnum, AclEntry from common.proto.acl_pb2 import AclForwardActionEnum, AclRuleTypeEnum, AclEntry
from common.proto.context_pb2 import ConfigActionEnum, ConfigRule from common.proto.context_pb2 import ConfigActionEnum, ConfigRule
class AclDirectionEnum(Enum): class AclDirectionEnum(Enum):
INGRESS = 'ingress' INGRESS = 'ingress'
EGRESS = 'egress' EGRESS = 'egress'
class Ipv4(BaseModel): class Ipv4(BaseModel):
dscp: int = 0 dscp: int = 0
source_ipv4_network: str = Field(serialization_alias="source-ipv4-network", default="") source_ipv4_network: str = Field(serialization_alias='source-ipv4-network', default='')
destination_ipv4_network: str = Field(serialization_alias="destination-ipv4-network", default="") destination_ipv4_network: str = Field(
serialization_alias='destination-ipv4-network', default=''
)
class Port(BaseModel): class Port(BaseModel):
port: int = 0 port: int = 0
operator: str = "eq" operator: str = 'eq'
class Tcp(BaseModel): class Tcp(BaseModel):
flags: str = "" flags: Optional[str] = None
source_port: Port = Field(serialization_alias="source-port", default_factory=lambda: Port()) source_port: Optional[Port] = Field(serialization_alias='source-port', default=None)
destination_port: Port = Field(serialization_alias="destination-port", default_factory=lambda: Port()) destination_port: Optional[Port] = Field(serialization_alias='destination-port', default=None)
class Matches(BaseModel): class Matches(BaseModel):
ipv4: Ipv4 = Ipv4() ipv4: Ipv4 = Ipv4()
tcp: Tcp = Tcp() tcp: Optional[Tcp] = None
class Action(BaseModel): class Action(BaseModel):
forwarding: str = "" forwarding: str = ''
class Ace(BaseModel): class Ace(BaseModel):
name: str = "custom_rule" name: str = ''
matches: Matches = Matches() matches: Matches = Matches()
actions: Action = Action() actions: Action = Action()
class Aces(BaseModel): class Aces(BaseModel):
ace: List[Ace] = [Ace()] ace: List[Ace] = [Ace()]
class Acl(BaseModel): class Acl(BaseModel):
name: str = "" name: str = ''
type: str = "" type: str = ''
aces: Aces = Aces() aces: Aces = Aces()
class Name(BaseModel): class Name(BaseModel):
name: str = "" name: str = ''
class AclSet(BaseModel): class AclSet(BaseModel):
acl_set: List[Name] = Field(serialization_alias="acl-set", default=[Name()]) acl_set: List[Name] = Field(serialization_alias='acl-set', default=[Name()])
class AclSets(BaseModel): class AclSets(BaseModel):
acl_sets: AclSet = Field(serialization_alias="acl-sets", default=AclSet()) acl_sets: AclSet = Field(serialization_alias='acl-sets', default=AclSet())
class Ingress(BaseModel): class Ingress(BaseModel):
ingress : AclSets = AclSets() ingress: AclSets = AclSets()
class Egress(BaseModel): class Egress(BaseModel):
egress : AclSets = AclSets() egress: AclSets = AclSets()
class Interface(BaseModel): class Interface(BaseModel):
interface_id: str = Field(serialization_alias="interface-id", default="") interface_id: str = Field(serialization_alias='interface-id', default='')
ingress : Ingress = Ingress() ingress: Optional[AclSets] = None
egress : Egress = Egress() egress: Optional[AclSets] = None
class Interfaces(BaseModel): class Interfaces(BaseModel):
interface: List[Interface] = [Interface()] interface: List[Interface] = [Interface()]
class AttachmentPoints(BaseModel):
attachment_points: Interfaces = Field(serialization_alias="attachment-points", default=Interfaces())
class Acls(BaseModel): class Acls(BaseModel):
acl: List[Acl] = [Acl()] acl: List[Acl] = [Acl()]
attachment_points: AttachmentPoints = Field(serialization_alias="attachment-points", default=AttachmentPoints()) attachment_points: Optional[Interfaces] = Field(
serialization_alias='attachment-points', default=None
)
class IETF_ACL(BaseModel): class IETF_ACL(BaseModel):
acls: Acls = Acls() acls: Optional[Acls] = Field(serialization_alias='ietf-access-control-list:acls', default=None)
IETF_TFS_RULE_TYPE_MAPPING = { IETF_TFS_RULE_TYPE_MAPPING = {
"ipv4-acl-type": "ACLRULETYPE_IPV4", 'ipv4-acl-type': 'ACLRULETYPE_IPV4',
"ipv6-acl-type": "ACLRULETYPE_IPV6", 'ipv6-acl-type': 'ACLRULETYPE_IPV6',
} }
IETF_TFS_FORWARDING_ACTION_MAPPING = { IETF_TFS_FORWARDING_ACTION_MAPPING = {
"accept": "ACLFORWARDINGACTION_ACCEPT", 'accept': 'ACLFORWARDINGACTION_ACCEPT',
"drop" : "ACLFORWARDINGACTION_DROP", 'drop': 'ACLFORWARDINGACTION_DROP',
} }
TFS_IETF_RULE_TYPE_MAPPING = { TFS_IETF_RULE_TYPE_MAPPING = {
"ACLRULETYPE_IPV4": "ipv4-acl-type", 'ACLRULETYPE_IPV4': 'ipv4-acl-type',
"ACLRULETYPE_IPV6": "ipv6-acl-type", 'ACLRULETYPE_IPV6': 'ipv6-acl-type',
} }
TFS_IETF_FORWARDING_ACTION_MAPPING = { TFS_IETF_FORWARDING_ACTION_MAPPING = {
"ACLFORWARDINGACTION_ACCEPT": "accept", 'ACLFORWARDINGACTION_ACCEPT': 'accept',
"ACLFORWARDINGACTION_DROP" : "drop", 'ACLFORWARDINGACTION_DROP': 'drop',
} }
def config_rule_from_ietf_acl( def config_rule_from_ietf_acl(
device_name : str, endpoint_name : str, acl_set_data : Dict device_name: str, endpoint_name: str, acl_set_data: Dict
) -> ConfigRule: ) -> ConfigRule:
acl_config_rule = ConfigRule() acl_config_rule = ConfigRule()
acl_config_rule.action = ConfigActionEnum.CONFIGACTION_SET acl_config_rule.action = ConfigActionEnum.CONFIGACTION_SET
...@@ -129,18 +149,18 @@ def config_rule_from_ietf_acl( ...@@ -129,18 +149,18 @@ def config_rule_from_ietf_acl(
acl_rule_set = acl_config_rule.acl.rule_set acl_rule_set = acl_config_rule.acl.rule_set
acl_rule_set.name = acl_name acl_rule_set.name = acl_name
acl_rule_set.type = acl_type acl_rule_set.type = acl_type
#acl_rule_set.description = ... # acl_rule_set.description = ...
access_control_entry_list = acl_set_data.get('aces', {}).get('ace', []) access_control_entry_list = acl_set_data.get('aces', {}).get('ace', [])
for sequence_id,ace in enumerate(access_control_entry_list): for sequence_id, ace in enumerate(access_control_entry_list):
ace_name = ace['name'] ace_name = ace['name']
ace_matches = ace.get('matches', {}) ace_matches = ace.get('matches', {})
ace_actions = ace.get('actions', {}) ace_actions = ace.get('actions', {})
acl_entry = AclEntry() acl_entry = AclEntry()
acl_entry.sequence_id = sequence_id + 1 acl_entry.sequence_id = sequence_id + 1
#acl_entry.description = ... acl_entry.description = ace_name
if 'ipv4' in ace_matches: if 'ipv4' in ace_matches:
ipv4_data = ace_matches['ipv4'] ipv4_data = ace_matches['ipv4']
if 'source-ipv4-network' in ipv4_data: if 'source-ipv4-network' in ipv4_data:
...@@ -157,14 +177,14 @@ def config_rule_from_ietf_acl( ...@@ -157,14 +177,14 @@ def config_rule_from_ietf_acl(
acl_entry.match.protocol = 6 acl_entry.match.protocol = 6
tcp_data = ace_matches['tcp'] tcp_data = ace_matches['tcp']
if 'source-port' in tcp_data: if 'source-port' in tcp_data:
tcp_src_port : Dict = tcp_data['source-port'] tcp_src_port: Dict = tcp_data['source-port']
tcp_src_port_op = tcp_src_port.get('operator', 'eq') tcp_src_port_op = tcp_src_port.get('operator', 'eq')
if tcp_src_port_op != 'eq': if tcp_src_port_op != 'eq':
MSG = 'Acl({:s})/Ace({:s})/Match/Tcp({:s}) operator not supported' MSG = 'Acl({:s})/Ace({:s})/Match/Tcp({:s}) operator not supported'
raise NotImplemented(MSG.format(acl_name, ace_name, str(tcp_data))) raise NotImplemented(MSG.format(acl_name, ace_name, str(tcp_data)))
acl_entry.match.src_port = tcp_src_port['port'] acl_entry.match.src_port = tcp_src_port['port']
if 'destination-port' in tcp_data: if 'destination-port' in tcp_data:
tcp_dst_port : Dict = tcp_data['destination-port'] tcp_dst_port: Dict = tcp_data['destination-port']
tcp_dst_port_op = tcp_dst_port.get('operator', 'eq') tcp_dst_port_op = tcp_dst_port.get('operator', 'eq')
if tcp_dst_port_op != 'eq': if tcp_dst_port_op != 'eq':
MSG = 'Acl({:s})/Ace({:s})/Match/Tcp({:s}) operator not supported' MSG = 'Acl({:s})/Ace({:s})/Match/Tcp({:s}) operator not supported'
...@@ -178,14 +198,14 @@ def config_rule_from_ietf_acl( ...@@ -178,14 +198,14 @@ def config_rule_from_ietf_acl(
acl_entry.match.protocol = 17 acl_entry.match.protocol = 17
udp_data = ace_matches['udp'] udp_data = ace_matches['udp']
if 'source-port' in udp_data: if 'source-port' in udp_data:
udp_src_port : Dict = udp_data['source-port'] udp_src_port: Dict = udp_data['source-port']
udp_src_port_op = udp_src_port.get('operator', 'eq') udp_src_port_op = udp_src_port.get('operator', 'eq')
if udp_src_port_op != 'eq': if udp_src_port_op != 'eq':
MSG = 'Acl({:s})/Ace({:s})/Match/Udp({:s}) operator not supported' MSG = 'Acl({:s})/Ace({:s})/Match/Udp({:s}) operator not supported'
raise NotImplemented(MSG.format(acl_name, ace_name, str(udp_data))) raise NotImplemented(MSG.format(acl_name, ace_name, str(udp_data)))
acl_entry.match.src_port = udp_src_port['port'] acl_entry.match.src_port = udp_src_port['port']
if 'destination-port' in udp_data: if 'destination-port' in udp_data:
udp_dst_port : Dict = udp_data['destination-port'] udp_dst_port: Dict = udp_data['destination-port']
udp_dst_port_op = udp_dst_port.get('operator', 'eq') udp_dst_port_op = udp_dst_port.get('operator', 'eq')
if udp_dst_port_op != 'eq': if udp_dst_port_op != 'eq':
MSG = 'Acl({:s})/Ace({:s})/Match/Udp({:s}) operator not supported' MSG = 'Acl({:s})/Ace({:s})/Match/Udp({:s}) operator not supported'
...@@ -203,55 +223,49 @@ def config_rule_from_ietf_acl( ...@@ -203,55 +223,49 @@ def config_rule_from_ietf_acl(
return acl_config_rule return acl_config_rule
def ietf_acl_from_config_rule_resource_value(config_rule_rv: Dict) -> Dict: def ietf_acl_from_config_rule_resource_value(config_rule_rv: Dict) -> Dict:
rule_set = config_rule_rv['rule_set'] rule_set = config_rule_rv['rule_set']
acl_entry = rule_set['entries'][0] ace = []
match_ = acl_entry['match']
for acl_entry in rule_set['entries']:
ipv4 = Ipv4( match_ = acl_entry['match']
dscp=match_["dscp"], ipv4 = Ipv4(
source_ipv4_network=match_["src_address"], dscp=match_['dscp'],
destination_ipv4_network=match_["dst_address"] source_ipv4_network=match_['src_address'],
) destination_ipv4_network=match_['dst_address'],
tcp = Tcp(
flags=match_["tcp_flags"],
source_port=Port(port=match_["src_port"]),
destination_port=Port(port=match_["dst_port"])
)
matches = Matches(ipvr=ipv4, tcp=tcp)
aces = Aces(ace=[
Ace(
matches=matches,
actions=Action(
forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[acl_entry["action"]["forward_action"]]
)
)
])
acl = Acl(
name=rule_set["name"],
type=TFS_IETF_RULE_TYPE_MAPPING[rule_set["type"]],
aces=aces
)
acl_sets = AclSets(
acl_sets=AclSet(
acl_set=[
Name(name=rule_set["name"])
]
)
)
ingress = Ingress(ingress=acl_sets)
interfaces = Interfaces(interface=[
Interface(
interface_id=config_rule_rv["interface"],
ingress=ingress
) )
]) tcp = None
acls = Acls( if match_['tcp_flags']:
acl=[acl], tcp = Tcp(
attachment_points=AttachmentPoints( flags=match_['tcp_flags'],
attachment_points=interfaces source_port=Port(port=match_['src_port']),
destination_port=Port(port=match_['dst_port']),
)
matches = Matches(ipv4=ipv4, tcp=tcp)
ace.append(
Ace(
name=acl_entry['description'],
matches=matches,
actions=Action(
forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[
acl_entry['action']['forward_action']
]
),
)
) )
aces = Aces(ace=ace)
acl = Acl(name=rule_set['name'], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set['type']], aces=aces)
acl_sets = AclSets(acl_sets=AclSet(acl_set=[Name(name=rule_set['name'])]))
interfaces = Interfaces(
interface=[
Interface(
interface_id=config_rule_rv['endpoint_id']['endpoint_uuid']['uuid'],
ingress=acl_sets,
)
]
) )
acls = Acls(acl=[acl], attachment_points=interfaces)
ietf_acl = IETF_ACL(acls=acls) ietf_acl = IETF_ACL(acls=acls)
return ietf_acl.model_dump(by_alias=True) return ietf_acl.model_dump(by_alias=True, exclude_none=True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment