Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import ConfigRule
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.ConfigRule import json_config_rule_set
DEVICE_SETTINGS = re.compile(r'\/device\[([^\]]+)\]\/settings')
ENDPOINT_SETTINGS = re.compile(r'\/device\[([^\]]+)\]\/endpoint\[([^\]]+)\]\/settings')
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
L2NM_SETTINGS_FIELD_DEFAULTS = {
'encapsulation_type': 'dot1q',
'vlan_id' : 100,
'mtu' : 1450,
}
L3NM_SETTINGS_FIELD_DEFAULTS = {
'encapsulation_type': 'dot1q',
'vlan_id' : 100,
'mtu' : 1450,
}
TAPI_SETTINGS_FIELD_DEFAULTS = {
'capacity_value' : 50.0,
'capacity_unit' : 'GHz',
'layer_proto_name': 'PHOTONIC_MEDIA',
'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
'direction' : 'UNIDIRECTIONAL',
}
def find_custom_config_rule(config_rules : List, resource_name : str) -> Optional[Dict]:
resource_value : Optional[Dict] = None
for config_rule in config_rules:
if config_rule.WhichOneof('config_rule') != 'custom': continue
if config_rule.custom.resource_key != resource_name: continue
resource_value = json.loads(config_rule.custom.resource_value)
return resource_value
def compose_config_rules(
main_service_config_rules : List, subservice_config_rules : List, field_defaults : Dict
) -> None:
settings = find_custom_config_rule(main_service_config_rules, SETTINGS_RULE_NAME)
if settings is None: return
json_settings = {}
for field_name,default_value in field_defaults.items():
json_settings[field_name] = settings.get(field_name, default_value)
config_rule = ConfigRule(**json_config_rule_set('/settings', json_settings))
subservice_config_rules.append(config_rule)
def compose_l2nm_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, L2NM_SETTINGS_FIELD_DEFAULTS)
def compose_l3nm_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, L3NM_SETTINGS_FIELD_DEFAULTS)
def compose_tapi_config_rules(main_service_config_rules : List, subservice_config_rules : List) -> None:
compose_config_rules(main_service_config_rules, subservice_config_rules, TAPI_SETTINGS_FIELD_DEFAULTS)
def compose_device_config_rules(
config_rules : List, subservice_config_rules : List, path_hops : List,
device_name_mapping : Dict[str, str], endpoint_name_mapping : Dict[Tuple[str, str], str]
) -> None:
LOGGER.debug('[compose_device_config_rules] begin')
LOGGER.debug('[compose_device_config_rules] device_name_mapping={:s}'.format(str(device_name_mapping)))
LOGGER.debug('[compose_device_config_rules] endpoint_name_mapping={:s}'.format(str(endpoint_name_mapping)))
endpoints_traversed = set()
for path_hop in path_hops:
device_uuid_or_name = path_hop['device']
devices_traversed.add(device_uuid_or_name)
endpoints_traversed.add((device_uuid_or_name, path_hop['ingress_ep']))
endpoints_traversed.add((device_uuid_or_name, path_hop['egress_ep']))
LOGGER.debug('[compose_device_config_rules] devices_traversed={:s}'.format(str(devices_traversed)))
LOGGER.debug('[compose_device_config_rules] endpoints_traversed={:s}'.format(str(endpoints_traversed)))
LOGGER.debug('[compose_device_config_rules] processing config_rule: {:s}'.format(
grpc_message_to_json_string(config_rule)))
if config_rule.WhichOneof('config_rule') == 'acl':
LOGGER.debug('[compose_device_config_rules] is acl')
endpoint_id = config_rule.acl.endpoint_id
device_uuid_or_name = endpoint_id.device_id.device_uuid.uuid
LOGGER.debug('[compose_device_config_rules] device_uuid_or_name={:s}'.format(str(device_uuid_or_name)))
device_name_or_uuid = device_name_mapping.get(device_uuid_or_name, device_uuid_or_name)
LOGGER.debug('[compose_device_config_rules] device_name_or_uuid={:s}'.format(str(device_name_or_uuid)))
device_keys = {device_uuid_or_name, device_name_or_uuid}
if len(device_keys.intersection(devices_traversed)) == 0: continue
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
LOGGER.debug('[compose_device_config_rules] endpoint_uuid={:s}'.format(str(endpoint_uuid)))
# given endpoint uuids link 'eth-1/0/20.533', remove last part after the '.'
endpoint_uuid_or_name = (endpoint_uuid[::-1].split('.', maxsplit=1)[-1])[::-1]
LOGGER.debug('[compose_device_config_rules] endpoint_uuid_or_name={:s}'.format(str(endpoint_uuid_or_name)))
endpoint_name_or_uuid_1 = endpoint_name_mapping[(device_uuid_or_name, endpoint_uuid_or_name)]
endpoint_name_or_uuid_2 = endpoint_name_mapping[(device_name_or_uuid, endpoint_uuid_or_name)]
endpoint_keys = {endpoint_uuid_or_name, endpoint_name_or_uuid_1, endpoint_name_or_uuid_2}
device_endpoint_keys = set(itertools.product(device_keys, endpoint_keys))
if len(device_endpoint_keys.intersection(endpoints_traversed)) == 0: continue
LOGGER.debug('[compose_device_config_rules] adding acl config rule')
subservice_config_rules.append(config_rule)
elif config_rule.WhichOneof('config_rule') == 'custom':
LOGGER.debug('[compose_device_config_rules] is custom')
match = DEVICE_SETTINGS.match(config_rule.custom.resource_key)
if match is not None:
device_uuid_or_name = match.group(1)
device_name_or_uuid = device_name_mapping[device_uuid_or_name]
device_keys = {device_uuid_or_name, device_name_or_uuid}
if len(device_keys.intersection(devices_traversed)) == 0: continue
subservice_config_rules.append(config_rule)
match = ENDPOINT_SETTINGS.match(config_rule.custom.resource_key)
if match is not None:
device_uuid_or_name = match.group(1)
device_name_or_uuid = device_name_mapping[device_uuid_or_name]
device_keys = {device_uuid_or_name, device_name_or_uuid}
endpoint_uuid_or_name = match.group(2)
endpoint_name_or_uuid_1 = endpoint_name_mapping[(device_uuid_or_name, endpoint_uuid_or_name)]
endpoint_name_or_uuid_2 = endpoint_name_mapping[(device_name_or_uuid, endpoint_uuid_or_name)]
endpoint_keys = {endpoint_uuid_or_name, endpoint_name_or_uuid_1, endpoint_name_or_uuid_2}
device_endpoint_keys = set(itertools.product(device_keys, endpoint_keys))
if len(device_endpoint_keys.intersection(endpoints_traversed)) == 0: continue
subservice_config_rules.append(config_rule)
else:
continue
LOGGER.debug('[compose_device_config_rules] end')