-
Shayan Hajipour authored
bug fix: temporary fix for MEC PoC demo. Config rules and OpenConfig driver for L3VPN should change based on telefonica recipe.
Shayan Hajipour authoredbug fix: temporary fix for MEC PoC demo. Config rules and OpenConfig driver for L3VPN should change based on telefonica recipe.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ComposeConfigRules.py 8.96 KiB
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools, json, logging, re
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import ConfigRule
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.ConfigRule import json_config_rule_set
LOGGER = logging.getLogger(__name__)
SETTINGS_RULE_NAME = '/settings'
DEVICE_SETTINGS = re.compile(r'\/device\[([^\]]+)\]\/settings')
ENDPOINT_SETTINGS = re.compile(r'\/device\[([^\]]+)\]\/endpoint\[([^\]]+)\]\/settings')
RE_ENDPOINT_VLAN_SETTINGS = re.compile(r'\/device\[([^\]]+)\]\/endpoint\[([^\]]+)\]\/vlan\[([^\]]+)\]\/settings')
L2NM_SETTINGS_FIELD_DEFAULTS = {
'encapsulation_type': 'dot1q',
'vlan_id' : 100,
'mtu' : 1450,
}
L3NM_SETTINGS_FIELD_DEFAULTS = {
'encapsulation_type': 'dot1q',
'vlan_id' : 100,
'mtu' : 1450,
}
TAPI_SETTINGS_FIELD_DEFAULTS = {
'capacity_value' : 50.0,
'capacity_unit' : 'GHz',
'layer_proto_name': 'PHOTONIC_MEDIA',
'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
'direction' : 'UNIDIRECTIONAL',
}
def find_custom_config_rule(config_rules : List, resource_name : str) -> Optional[Dict]:
resource_value : Optional[Dict] = None
for config_rule in config_rules:
if config_rule.WhichOneof('config_rule') != 'custom': continue
if config_rule.custom.resource_key != resource_name: continue
resource_value = json.loads(config_rule.custom.resource_value)
return resource_value
def compose_config_rules(
main_service_config_rules : List, subservice_config_rules : List, field_defaults : Dict
) -> None:
settings = find_custom_config_rule(main_service_config_rules, SETTINGS_RULE_NAME)
if settings is None: return
json_settings = {}
for field_name,default_value in field_defaults.items():
json_settings[field_name] = settings.get(field_name, default_value)
config_rule = ConfigRule(**json_config_rule_set('/settings', json_settings))
subservice_config_rules.append(config_rule)
def compose_l2nm_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, L2NM_SETTINGS_FIELD_DEFAULTS)
def compose_l3nm_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, L3NM_SETTINGS_FIELD_DEFAULTS)
def compose_tapi_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, TAPI_SETTINGS_FIELD_DEFAULTS)
def compose_device_config_rules(
config_rules : List, subservice_config_rules : List, path_hops : List,
device_name_mapping : Dict[str, str], endpoint_name_mapping : Dict[Tuple[str, str], str]
) -> None:
LOGGER.debug('[compose_device_config_rules] begin')
LOGGER.debug('[compose_device_config_rules] device_name_mapping={:s}'.format(str(device_name_mapping)))
LOGGER.debug('[compose_device_config_rules] endpoint_name_mapping={:s}'.format(str(endpoint_name_mapping)))
devices_traversed = set()
endpoints_traversed = set()
for path_hop in path_hops:
device_uuid_or_name = path_hop['device']
devices_traversed.add(device_uuid_or_name)
endpoints_traversed.add((device_uuid_or_name, path_hop['ingress_ep']))
endpoints_traversed.add((device_uuid_or_name, path_hop['egress_ep']))
LOGGER.debug('[compose_device_config_rules] devices_traversed={:s}'.format(str(devices_traversed)))
LOGGER.debug('[compose_device_config_rules] endpoints_traversed={:s}'.format(str(endpoints_traversed)))
for config_rule in config_rules:
LOGGER.debug('[compose_device_config_rules] processing config_rule: {:s}'.format(
grpc_message_to_json_string(config_rule)))
if config_rule.WhichOneof('config_rule') == 'acl':
LOGGER.debug('[compose_device_config_rules] is acl')
endpoint_id = config_rule.acl.endpoint_id
device_uuid_or_name = endpoint_id.device_id.device_uuid.uuid
LOGGER.debug('[compose_device_config_rules] device_uuid_or_name={:s}'.format(str(device_uuid_or_name)))
device_name_or_uuid = device_name_mapping.get(device_uuid_or_name, device_uuid_or_name)
LOGGER.debug('[compose_device_config_rules] device_name_or_uuid={:s}'.format(str(device_name_or_uuid)))
device_keys = {device_uuid_or_name, device_name_or_uuid}
if len(device_keys.intersection(devices_traversed)) == 0: continue
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
LOGGER.debug('[compose_device_config_rules] endpoint_uuid={:s}'.format(str(endpoint_uuid)))
# given endpoint uuids link 'eth-1/0/20.533', remove last part after the '.'
endpoint_uuid_or_name = (endpoint_uuid[::-1].split('.', maxsplit=1)[-1])[::-1]
LOGGER.debug('[compose_device_config_rules] endpoint_uuid_or_name={:s}'.format(str(endpoint_uuid_or_name)))
endpoint_name_or_uuid_1 = endpoint_name_mapping[(device_uuid_or_name, endpoint_uuid_or_name)]
endpoint_name_or_uuid_2 = endpoint_name_mapping[(device_name_or_uuid, endpoint_uuid_or_name)]
endpoint_keys = {endpoint_uuid_or_name, endpoint_name_or_uuid_1, endpoint_name_or_uuid_2}
device_endpoint_keys = set(itertools.product(device_keys, endpoint_keys))
if len(device_endpoint_keys.intersection(endpoints_traversed)) == 0: continue
LOGGER.debug('[compose_device_config_rules] adding acl config rule')
subservice_config_rules.append(config_rule)
elif config_rule.WhichOneof('config_rule') == 'custom':
LOGGER.debug('[compose_device_config_rules] is custom')
match = DEVICE_SETTINGS.match(config_rule.custom.resource_key)
if match is not None:
device_uuid_or_name = match.group(1)
device_name_or_uuid = device_name_mapping[device_uuid_or_name]
device_keys = {device_uuid_or_name, device_name_or_uuid}
if len(device_keys.intersection(devices_traversed)) == 0: continue
subservice_config_rules.append(config_rule)
match = ENDPOINT_SETTINGS.match(config_rule.custom.resource_key)
if match is not None:
device_uuid_or_name = match.group(1)
device_name_or_uuid = device_name_mapping[device_uuid_or_name]
device_keys = {device_uuid_or_name, device_name_or_uuid}
endpoint_uuid_or_name = match.group(2)
endpoint_name_or_uuid_1 = endpoint_name_mapping[(device_uuid_or_name, endpoint_uuid_or_name)]
endpoint_name_or_uuid_2 = endpoint_name_mapping[(device_name_or_uuid, endpoint_uuid_or_name)]
endpoint_keys = {endpoint_uuid_or_name, endpoint_name_or_uuid_1, endpoint_name_or_uuid_2}
device_endpoint_keys = set(itertools.product(device_keys, endpoint_keys))
if len(device_endpoint_keys.intersection(endpoints_traversed)) == 0: continue
subservice_config_rules.append(config_rule)
match = RE_ENDPOINT_VLAN_SETTINGS.match(config_rule.custom.resource_key)
if match is not None:
device_uuid_or_name = match.group(1)
device_name_or_uuid = device_name_mapping[device_uuid_or_name]
device_keys = {device_uuid_or_name, device_name_or_uuid}
endpoint_uuid_or_name = match.group(2)
endpoint_name_or_uuid_1 = endpoint_name_mapping[(device_uuid_or_name, endpoint_uuid_or_name)]
endpoint_name_or_uuid_2 = endpoint_name_mapping[(device_name_or_uuid, endpoint_uuid_or_name)]
endpoint_keys = {endpoint_uuid_or_name, endpoint_name_or_uuid_1, endpoint_name_or_uuid_2}
device_endpoint_keys = set(itertools.product(device_keys, endpoint_keys))
if len(device_endpoint_keys.intersection(endpoints_traversed)) == 0: continue
# ! check later: vlan removed from config_rule
config_rule.custom.resource_key = re.sub('\/vlan\[[^\]]+\]', '', config_rule.custom.resource_key)
subservice_config_rules.append(config_rule)
else:
continue
LOGGER.debug('[compose_device_config_rules] end')