Commits (2)
...@@ -46,6 +46,7 @@ message AclMatch { ...@@ -46,6 +46,7 @@ message AclMatch {
uint32 dst_port = 6; uint32 dst_port = 6;
uint32 start_mpls_label = 7; uint32 start_mpls_label = 7;
uint32 end_mpls_label = 8; uint32 end_mpls_label = 8;
string flags = 9;
} }
message AclAction { message AclAction {
......
...@@ -498,6 +498,7 @@ message ConfigRule_Custom { ...@@ -498,6 +498,7 @@ message ConfigRule_Custom {
message ConfigRule_ACL { message ConfigRule_ACL {
EndPointId endpoint_id = 1; EndPointId endpoint_id = 1;
acl.AclRuleSet rule_set = 2; acl.AclRuleSet rule_set = 2;
string interface = 3;
} }
message ConfigRule { message ConfigRule {
......
...@@ -27,6 +27,9 @@ from .NetworkInstances import parse as parse_network_instances ...@@ -27,6 +27,9 @@ from .NetworkInstances import parse as parse_network_instances
from .RoutingPolicy import parse as parse_routing_policy from .RoutingPolicy import parse as parse_routing_policy
from .Acl import parse as parse_acl from .Acl import parse as parse_acl
from .Inventory import parse as parse_inventory from .Inventory import parse as parse_inventory
from .acl.acl_adapter import acl_cr_to_dict
from .acl.acl_adapter_ipinfusion_proprietary import acl_cr_to_dict_ipinfusion_proprietary
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ALL_RESOURCE_KEYS = [ ALL_RESOURCE_KEYS = [
...@@ -113,16 +116,26 @@ def compose_config( # template generation ...@@ -113,16 +116,26 @@ def compose_config( # template generation
elif (message_renderer == "jinja"): elif (message_renderer == "jinja"):
templates =[] templates =[]
template_name = '{:s}/edit_config.xml'.format(RE_REMOVE_FILTERS.sub('', resource_key))
templates.append(JINJA_ENV.get_template(template_name))
if "acl_ruleset" in resource_key: # MANAGING ACLs if "acl_ruleset" in resource_key: # MANAGING ACLs
templates =[] if True: #vendor == 'ipinfusion': #! ipinfusion proprietary netconf receipe is used temporarily
templates.append(JINJA_ENV.get_template('acl/acl-set/acl-entry/edit_config.xml')) acl_entry_path = 'acl/acl-set/acl-entry/edit_config_ipinfusion_proprietary.xml'
templates.append(JINJA_ENV.get_template('acl/interfaces/ingress/edit_config.xml')) acl_ingress_path = 'acl/interfaces/ingress/edit_config_ipinfusion_proprietary.xml'
data : Dict[str, Any] = json.loads(resource_value) data : Dict[str, Any] = acl_cr_to_dict_ipinfusion_proprietary(resource_value, delete=delete)
else:
acl_entry_path = 'acl/acl-set/acl-entry/edit_config.xml'
acl_ingress_path = 'acl/interfaces/ingress/edit_config.xml'
data : Dict[str, Any] = acl_cr_to_dict(resource_value, delete=delete)
if delete: # unpair acl and interface before removing acl
templates.append(JINJA_ENV.get_template(acl_ingress_path))
templates.append(JINJA_ENV.get_template(acl_entry_path))
else:
templates.append(JINJA_ENV.get_template(acl_entry_path))
templates.append(JINJA_ENV.get_template(acl_ingress_path))
else:
template_name = '{:s}/edit_config.xml'.format(RE_REMOVE_FILTERS.sub('', resource_key))
templates.append(JINJA_ENV.get_template(template_name))
data : Dict[str, Any] = json.loads(resource_value)
operation = 'delete' if delete else 'merge' operation = 'delete' if delete else 'merge'
return [ return [
'<config>{:s}</config>'.format( '<config>{:s}</config>'.format(
template.render(**data, operation=operation, vendor=vendor).strip()) template.render(**data, operation=operation, vendor=vendor).strip())
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
\ No newline at end of file
<acl xmlns="http://www.ipinfusion.com/yang/ocnos/ipi-acl">
<acl-sets>
<acl-set {% if operation == 'delete' %}operation="delete"{% endif %}>
<name>{{name}}</name>
{% if type is defined %}<type>{{type}}</type>{% endif %}
<config>
<name>{{name}}</name>
{% if type is defined %}<type>{{type}}</type>{% endif %}
</config>
{% if operation != 'delete' %}
<acl-entries>
<acl-entry>
<sequence-id>{{sequence_id}}</sequence-id>
<config>
<sequence-id>{{sequence_id}}</sequence-id>
</config>
<ipv4>
<config>
<source-address>{{source_address}}</source-address>
<destination-address>{{destination_address}}</destination-address>
<dscp>{{dscp}}</dscp>
<protocol-tcp />
<tcp-source-port>{{source_port}}</tcp-source-port>
<tcp-destination-port>{{destination_port}}</tcp-destination-port>
<tcp-flags>{{tcp_flags}}</tcp-flags>
<forwarding-action>{{forwarding_action}}</forwarding-action>
</config>
</ipv4>
</acl-entry>
</acl-entries>
{% endif %}
</acl-set>
</acl-sets>
</acl>
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, TypedDict
from ..ACL.ACL_multivendor import RULE_TYPE_MAPPING, FORWARDING_ACTION_MAPPING, LOG_ACTION_MAPPING
class ACLRequestData(TypedDict):
name: str # acl-set name
type: str # acl-set type
sequence_id: int # acl-entry sequence-id
source_address: str
destination_address: str
forwarding_action: str
id: str # interface id
interface: str
subinterface: int
set_name_ingress: str # ingress-acl-set name
type_ingress: str # ingress-acl-set type
all: bool
dscp: int
protocol: int
tcp_flags: str
source_port: int
destination_port: int
def acl_cr_to_dict(acl_cr_dict: Dict, subinterface:int = 0) -> Dict:
rule_set = acl_cr_dict['rule_set']
rule_set_entry = rule_set['entries'][0]
rule_set_entry_match = rule_set_entry['match']
rule_set_entry_action = rule_set_entry['action']
name: str = rule_set['name']
type: str = RULE_TYPE_MAPPING[rule_set["type"]]
sequence_id = rule_set_entry['sequence_id']
source_address = rule_set_entry_match['src_address']
destination_address = rule_set_entry_match['dst_address']
forwarding_action: str = FORWARDING_ACTION_MAPPING[rule_set_entry_action['forward_action']]
interface_id = acl_cr_dict['interface']
interface = interface_id
set_name_ingress = name
type_ingress = type
return ACLRequestData(
name=name,
type=type,
sequence_id=sequence_id,
source_address=source_address,
destination_address=destination_address,
forwarding_action=forwarding_action,
id=interface_id,
interface=interface,
# subinterface=subinterface,
set_name_ingress=set_name_ingress,
type_ingress=type_ingress,
all=True,
dscp=18,
protocol=6,
tcp_flags='TCP_SYN',
source_port=22,
destination_port=80
)
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, TypedDict
RULE_TYPE_MAPPING = {
'ACLRULETYPE_IPV4' : 'ip',
}
FORWARDING_ACTION_MAPPING = {
'ACLFORWARDINGACTION_DROP' : 'deny',
'ACLFORWARDINGACTION_ACCEPT' : 'permit',
}
class ACLRequestData(TypedDict):
name: str # acl-set name
type: str # acl-set type
sequence_id: int # acl-entry sequence-id
source_address: str
destination_address: str
forwarding_action: str
interface: str
dscp: int
tcp_flags: str
source_port: int
destination_port: int
def acl_cr_to_dict_ipinfusion_proprietary(acl_cr_dict: Dict, delete: bool = False) -> Dict:
rule_set = acl_cr_dict['rule_set']
name: str = rule_set['name']
type: str = RULE_TYPE_MAPPING[rule_set["type"]]
interface = acl_cr_dict['interface'][5:] # remove preceding `PORT-` characters
if delete:
return ACLRequestData(name=name, type=type, interface=interface)
rule_set_entry = rule_set['entries'][0]
rule_set_entry_match = rule_set_entry['match']
rule_set_entry_action = rule_set_entry['action']
return ACLRequestData(
name=name,
type=type,
sequence_id=rule_set_entry['sequence_id'],
source_address=rule_set_entry_match['src_address'],
destination_address=rule_set_entry_match['dst_address'],
forwarding_action=FORWARDING_ACTION_MAPPING[rule_set_entry_action['forward_action']],
interface=interface,
dscp=rule_set_entry_match["dscp"],
tcp_flags=rule_set_entry_match["flags"],
source_port=rule_set_entry_match['src_port'],
destination_port=rule_set_entry_match['dst_port']
)
\ No newline at end of file
<acl xmlns="http://www.ipinfusion.com/yang/ocnos/ipi-acl">
<interfaces>
<interface>
<name>{{interface}}</name>
<config>
<name>{{interface}}</name>
</config>
<ingress-acl-sets>
<ingress-acl-set {% if operation == "delete" %}operation="delete"{% endif %}>
{% if type is defined %}<acl-type>{{type}}</acl-type>{% endif %}
<access-groups>
<access-group>
<acl-name>{{name}}</acl-name>
<config>
<acl-name>{{name}}</acl-name>
</config>
</access-group>
</access-groups>
<config>
{% if type is defined %}<acl-type>{{type}}</acl-type>{% endif %}
</config>
</ingress-acl-set>
</ingress-acl-sets>
</interface>
</interfaces>
</acl>
\ No newline at end of file
...@@ -24,3 +24,4 @@ pyang==2.6.0 ...@@ -24,3 +24,4 @@ pyang==2.6.0
git+https://github.com/robshakir/pyangbind.git git+https://github.com/robshakir/pyangbind.git
requests==2.27.1 requests==2.27.1
werkzeug==2.3.7 werkzeug==2.3.7
pydantic==2.6.3
...@@ -26,6 +26,7 @@ from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn ...@@ -26,6 +26,7 @@ from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn
from .rest_server.nbi_plugins.ietf_network import register_ietf_network from .rest_server.nbi_plugins.ietf_network import register_ietf_network
from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss
from .rest_server.nbi_plugins.ietf_acl import register_ietf_acl
terminate = threading.Event() terminate = threading.Event()
LOGGER = None LOGGER = None
...@@ -68,6 +69,7 @@ def main(): ...@@ -68,6 +69,7 @@ def main():
register_ietf_l3vpn(rest_server) # Registering L3VPN entrypoint register_ietf_l3vpn(rest_server) # Registering L3VPN entrypoint
register_ietf_network(rest_server) register_ietf_network(rest_server)
register_ietf_nss(rest_server) # Registering NSS entrypoint register_ietf_nss(rest_server) # Registering NSS entrypoint
register_ietf_acl(rest_server)
rest_server.start() rest_server.start()
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask_restful import Resource
from nbi.service.rest_server.nbi_plugins.ietf_acl.acl_service import ACL
from nbi.service.rest_server.nbi_plugins.ietf_acl.acl_services import ACLs
from nbi.service.rest_server.RestServer import RestServer
URL_PREFIX = "/restconf/data"
def __add_resource(rest_server: RestServer, resource: Resource, *urls, **kwargs):
urls = [(URL_PREFIX + url) for url in urls]
rest_server.add_resource(resource, *urls, **kwargs)
def register_ietf_acl(rest_server: RestServer):
__add_resource(
rest_server,
ACLs,
"/device=<device_uuid>/ietf-access-control-list:acls",
"/device=<device_uuid>/ietf-access-control-list:acls",
)
__add_resource(
rest_server,
ACL,
"/device=<device_uuid>/ietf-access-control-list:acl=<acl_name>",
"/device=<device_uuid>/ietf-access-control-list:acl=<acl_name>/",
)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import json
from flask_restful import Resource
from werkzeug.exceptions import NotFound
from nbi.service.rest_server.nbi_plugins.tools.Authentication import HTTP_AUTH
from common.proto.acl_pb2 import AclRuleTypeEnum
from common.proto.context_pb2 import (
ConfigActionEnum,
ConfigRule,
Device,
DeviceId,
)
from common.tools.object_factory.Device import json_device_id
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from .ietf_acl_parser import ietf_acl_from_config_rule_resource_value
LOGGER = logging.getLogger(__name__)
ACL_CONIG_RULE_KEY = r'\/device\[.+\]\/endpoint\[(.+)\]/acl_ruleset\[{}\]'
class ACL(Resource):
# @HTTP_AUTH.login_required
def get(self, device_uuid: str, acl_name: str):
RE_ACL_CONIG_RULE_KEY = re.compile(ACL_CONIG_RULE_KEY.format(acl_name))
context_client = ContextClient()
device_client = DeviceClient()
_device = context_client.GetDevice(DeviceId(**json_device_id(device_uuid)))
for cr in _device.device_config.config_rules:
if cr.WhichOneof('config_rule') == 'custom':
if ep_uuid_match := RE_ACL_CONIG_RULE_KEY.match(cr.custom.resource_key):
endpoint_uuid = ep_uuid_match.groups(0)[0]
resource_value_dict = json.loads(cr.custom.resource_value)
LOGGER.debug(f'P99: {resource_value_dict}')
return ietf_acl_from_config_rule_resource_value(resource_value_dict)
else:
raise NotFound(f'ACL not found')
# @HTTP_AUTH.login_required
def delete(self, device_uuid: str, acl_name: str):
RE_ACL_CONIG_RULE_KEY = re.compile(ACL_CONIG_RULE_KEY.format(acl_name))
context_client = ContextClient()
device_client = DeviceClient()
_device = context_client.GetDevice(DeviceId(**json_device_id(device_uuid)))
for cr in _device.device_config.config_rules:
if cr.WhichOneof('config_rule') == 'custom':
if ep_uuid_match := RE_ACL_CONIG_RULE_KEY.match(cr.custom.resource_key):
endpoint_uuid = ep_uuid_match.groups(0)[0]
resource_value_dict = json.loads(cr.custom.resource_value)
type_str = resource_value_dict['rule_set']['type']
interface = resource_value_dict['interface']
break
else:
raise NotFound(f'ACL not found')
acl_config_rule = ConfigRule()
acl_config_rule.action = ConfigActionEnum.CONFIGACTION_DELETE
acl_config_rule.acl.rule_set.name = acl_name
acl_config_rule.acl.interface = interface
acl_config_rule.acl.rule_set.type = getattr(AclRuleTypeEnum, type_str)
acl_config_rule.acl.endpoint_id.device_id.device_uuid.uuid = device_uuid
acl_config_rule.acl.endpoint_id.endpoint_uuid.uuid = endpoint_uuid
device = Device()
device.CopyFrom(_device)
del device.device_config.config_rules[:]
device.device_config.config_rules.append(acl_config_rule)
response = device_client.ConfigureDevice(device)
return (response.device_uuid.uuid).strip("\"\n")
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict
from flask import request
from flask_restful import Resource
from werkzeug.exceptions import NotFound
from common.proto.context_pb2 import Device, DeviceId
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Device import json_device_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from nbi.service.rest_server.nbi_plugins.tools.Authentication import HTTP_AUTH
from .ietf_acl_parser import config_rule_from_ietf_acl
LOGGER = logging.getLogger(__name__)
class ACLs(Resource):
# @HTTP_AUTH.login_required
def get(self):
return {}
# @HTTP_AUTH.login_required
def post(self, device_uuid: str):
if not request.is_json:
raise UnsupportedMediaType("JSON pyload is required")
request_data: Dict = request.json
LOGGER.debug("Request: {:s}".format(str(request_data)))
attached_interface = request_data["ietf-access-control-list"]["acls"]['attachment-points']['interface'][0]['interface-id']
context_client = ContextClient()
device_client = DeviceClient()
_device = context_client.GetDevice(DeviceId(**json_device_id(device_uuid)))
for ep in _device.device_endpoints:
if ep.name == attached_interface:
endpoint_uuid = ep.endpoint_id.endpoint_uuid.uuid
break
else:
raise NotFound(f'interface {attached_interface} not found in device {device_uuid}')
acl_config_rule = config_rule_from_ietf_acl(request_data, device_uuid, endpoint_uuid, sequence_id=1, subinterface=0)
LOGGER.info(f"ACL Config Rule: {grpc_message_to_json_string(acl_config_rule)}")
device = Device()
device.CopyFrom(_device)
del device.device_config.config_rules[:]
device.device_config.config_rules.append(acl_config_rule)
response = device_client.ConfigureDevice(device)
return (response.device_uuid.uuid).strip("\"\n")
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import json
import time
BASE_URL = "/restconf/data"
POST_URL = "/device={}/ietf-access-control-list:acls"
DELETE_URL = "/device={}/ietf-access-control-list:acl={}"
class IetfTfsClient:
def __init__(self,
tfs_host: str = "10.1.1.119",
tfs_port: int = 80,
username: str = "admin",
password: str = "admin",
timeout: int = 10,
allow_redirects: bool = True,
) -> None:
self.host = tfs_host
self.port = tfs_port
self.username = username
self.password = password
self.timeout = timeout
self.allow_redirects = allow_redirects
def post(self, device_uuid: str, ietf_acl_data: dict) -> str:
request_url = "http://{:s}:{:d}{:s}{:s}".format(self.host, self.port, BASE_URL, POST_URL.format(device_uuid))
reply = requests.request("post", request_url, timeout=self.timeout, json=ietf_acl_data, allow_redirects=self.allow_redirects)
return reply.text
def get(self, device_uuid: str, acl_name: str) -> str:
request_url = "http://{:s}:{:d}{:s}{:s}".format(self.host, self.port, BASE_URL, DELETE_URL.format(device_uuid, acl_name))
reply = requests.request("get", request_url, timeout=self.timeout, allow_redirects=self.allow_redirects)
return reply.text
def delete(self, device_uuid: str, acl_name: str) -> str:
request_url = "http://{:s}:{:d}{:s}{:s}".format(self.host, self.port, BASE_URL, DELETE_URL.format(device_uuid, acl_name))
reply = requests.request("delete", request_url, timeout=self.timeout, allow_redirects=self.allow_redirects)
return reply.text
if __name__ == "__main__":
csg1_device_uuid = 'b71fd62f-e3d4-5956-93b9-3139094836cf'
acl_name = 'sample-ipv4-acl'
acl_request_path = 'src/nbi/tests/data/ietf_acl.json'
with open(acl_request_path, 'r') as afile:
acl_request_data = json.load(afile)
ietf_tfs_client = IetfTfsClient()
post_response = ietf_tfs_client.post(csg1_device_uuid, acl_request_data)
print(f"post response: {post_response}")
time.sleep(.5)
get_response = ietf_tfs_client.get(csg1_device_uuid, acl_name)
print(f"get response: {get_response}")
time.sleep(.5)
delete_response = ietf_tfs_client.delete(csg1_device_uuid, acl_name)
print(f"delete response: {delete_response}")
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Dict, Optional, TypedDict
from pydantic import BaseModel, Field
from common.proto.acl_pb2 import AclForwardActionEnum, AclRuleTypeEnum, AclEntry
from common.proto.context_pb2 import ConfigActionEnum, ConfigRule
class Ipv4(BaseModel):
dscp: int = 0
source_ipv4_network: str = Field(serialization_alias="source-ipv4-network", default="")
destination_ipv4_network: str = Field(serialization_alias="destination-ipv4-network", default="")
class Port(BaseModel):
port: int = 0
operator: str = "eq"
class Tcp(BaseModel):
flags: str = ""
source_port: Port = Field(serialization_alias="source-port", default_factory=lambda: Port())
destination_port: Port = Field(serialization_alias="destination-port", default_factory=lambda: Port())
class Matches(BaseModel):
ipv4: Ipv4 = Ipv4()
tcp: Tcp = Tcp()
class Action(BaseModel):
forwarding: str = ""
class Ace(BaseModel):
name: str = "custom_rule"
matches: Matches = Matches()
actions: Action = Action()
class Aces(BaseModel):
ace: List[Ace] = [Ace()]
class Acl(BaseModel):
name: str = ""
type: str = ""
aces: Aces = Aces()
class Name(BaseModel):
name: str = ""
class AclSet(BaseModel):
acl_set: List[Name] = Field(serialization_alias="acl-set", default=[Name()])
class AclSets(BaseModel):
acl_sets: AclSet = Field(serialization_alias="acl-sets", default=AclSet())
class Ingress(BaseModel):
ingress: AclSets = AclSets()
class Interface(BaseModel):
interface_id: str = Field(serialization_alias="interface-id", default="")
ingress: Ingress = Ingress()
class Interfaces(BaseModel):
interface: List[Interface] = [Interface()]
class AttachmentPoints(BaseModel):
attachment_points: Interfaces = Field(serialization_alias="attachment-points", default=Interfaces())
class Acls(BaseModel):
acl: List[Acl] = [Acl()]
attachment_points: AttachmentPoints = Field(serialization_alias="attachment-points", default=AttachmentPoints())
class IETF_ACL(BaseModel):
acls: Acls = Acls()
IETF_TFS_RULE_TYPE_MAPPING = {
"ipv4-acl-type": "ACLRULETYPE_IPV4",
"ipv6-acl-type": "ACLRULETYPE_IPV6",
}
IETF_TFS_FORWARDING_ACTION_MAPPING = {
"drop": "ACLFORWARDINGACTION_DROP",
"accept": "ACLFORWARDINGACTION_ACCEPT",
}
TFS_IETF_RULE_TYPE_MAPPING = {
"ACLRULETYPE_IPV4": "ipv4-acl-type",
"ACLRULETYPE_IPV6": "ipv6-acl-type",
}
TFS_IETF_FORWARDING_ACTION_MAPPING = {
"ACLFORWARDINGACTION_DROP": "drop",
"ACLFORWARDINGACTION_ACCEPT": "accept",
}
def config_rule_from_ietf_acl(
request: Dict,
device_uuid: str,
endpoint_uuid: str,
sequence_id: int,
subinterface: int,
) -> ConfigRule:
the_acl = request["ietf-access-control-list"]["acls"]["acl"][0]
acl_ip_data = the_acl["aces"]["ace"][0]["matches"]["ipv4"]
acl_tcp_data = the_acl["aces"]["ace"][0]["matches"]["tcp"]
attachemnt_interface = request["ietf-access-control-list"]["acls"]['attachment-points']['interface'][0]
source_address = acl_ip_data["source-ipv4-network"]
destination_address = acl_ip_data["destination-ipv4-network"]
source_port = acl_tcp_data['source-port']['port']
destination_port = acl_tcp_data['destination-port']['port']
ietf_action = the_acl["aces"]["ace"][0]["actions"]["forwarding"]
interface_id = attachemnt_interface['interface-id']
acl_config_rule = ConfigRule()
acl_config_rule.action = ConfigActionEnum.CONFIGACTION_SET
acl_config_rule.acl.interface = interface_id
acl_endpoint_id = acl_config_rule.acl.endpoint_id
acl_endpoint_id.device_id.device_uuid.uuid = device_uuid
acl_endpoint_id.endpoint_uuid.uuid = endpoint_uuid
acl_rule_set = acl_config_rule.acl.rule_set
acl_rule_set.name = the_acl["name"]
acl_rule_set.type = getattr(AclRuleTypeEnum, IETF_TFS_RULE_TYPE_MAPPING[the_acl['type']])
acl_rule_set.description = (
f'{ietf_action} {the_acl["type"]}: {source_address}:{source_port}->{destination_address}:{destination_port}'
)
acl_entry = AclEntry()
acl_entry.sequence_id = sequence_id
acl_entry.match.src_address = source_address
acl_entry.match.dst_address = destination_address
acl_entry.match.src_port = source_port
acl_entry.match.dst_port = destination_port
acl_entry.match.dscp = acl_ip_data["dscp"]
acl_entry.match.flags = acl_tcp_data["flags"]
acl_entry.action.forward_action = getattr(AclForwardActionEnum, IETF_TFS_FORWARDING_ACTION_MAPPING[ietf_action])
acl_rule_set.entries.append(acl_entry)
return acl_config_rule
def ietf_acl_from_config_rule_resource_value(config_rule_rv: Dict) -> Dict:
rule_set = config_rule_rv['rule_set']
acl_entry = rule_set['entries'][0]
match_ = acl_entry['match']
ipv4 = Ipv4(dscp=match_["dscp"], source_ipv4_network=match_["src_address"], destination_ipv4_network=match_["dst_address"])
tcp = Tcp(flags=match_["flags"], source_port=Port(port=match_["src_port"]), destination_port=Port(port=match_["dst_port"]))
matches = Matches(ipvr=ipv4, tcp=tcp)
aces = Aces(ace=[Ace(matches=matches, actions=Action(forwarding=TFS_IETF_FORWARDING_ACTION_MAPPING[acl_entry["action"]["forward_action"]]))])
acl = Acl(name=rule_set["name"], type=TFS_IETF_RULE_TYPE_MAPPING[rule_set["type"]], aces=aces)
acl_sets = AclSets(acl_sets=AclSet(acl_set=[Name(name=rule_set["name"])]))
ingress = Ingress(ingress=acl_sets)
interfaces = Interfaces(interface=[Interface(interface_id=config_rule_rv["interface"], ingress=ingress)])
acls = Acls(acl=[acl], attachment_points=AttachmentPoints(attachment_points=interfaces))
ietf_acl = IETF_ACL(acls=acls)
return ietf_acl.model_dump(by_alias=True)
\ No newline at end of file
{
"ietf-access-control-list": {
"acls": {
"acl": [
{
"name": "sample-ipv4-acl",
"type": "ipv4-acl-type",
"aces": {
"ace": [
{
"name": "rule1",
"matches": {
"ipv4": {
"dscp": 18,
"source-ipv4-network": "192.168.10.6/24",
"destination-ipv4-network": "192.168.20.6/24"
},
"tcp": {
"flags": "syn",
"source-port": {
"port": 1444,
"operator": "eq"
},
"destination-port": {
"port": 1333,
"operator": "eq"
}
}
},
"actions": {
"forwarding": "drop"
}
}
]
}
}
],
"attachment-points": {
"interface": [
{
"interface-id": "PORT-ce1",
"ingress": {
"acl-sets": {
"acl-set": [
{
"name": "sample-ipv4-acl"
}
]
}
}
}
]
}
}
}
}
\ No newline at end of file