Newer
Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from typing import Dict, Optional
from dataclasses import dataclass
from .tf_service import TFService
from .utils import make_selector, set_optional_parameter
class InconsistentVlanConfiguration(Exception):
pass
@dataclass
class CEndpoint:
module: str
port: str
# Emulated/translated VLAN. May also be a letter
# Only present on TF side, never on gets from CM.
# VLAN is never transmitted to wire on endpoint, it is purely an internal construct
# However VLAN is part of the whole connection
vlan: str
capacity: int
href: Optional[str]
def ifname(self) -> str:
if self.vlan is None:
return self.module + "|" + self.port
else:
return self.module + "|" + self.port + "." + self.vlan
def portname(self) -> str:
return self.module + "|" + self.port
def __str__(self):
return f"({self.ifname()}, {self.capacity})"
def create_config(self) -> Dict[str, any]:
cfg = {
# VLAN is intentionally ignored here (None argument below)
"selector": make_selector(self.module, self.port, None)
}
if self.capacity > 0:
cfg["capacity"] = self.capacity
return cfg
class ConnectionDeserializationError(Exception):
pass
class Connection:
def __init__(self, from_json: Optional[Dict[str, any]] = None, from_tf_service: Optional[TFService] = None):
def get_endpoint_mod_aid(endpoint: Dict[str, any]) -> Optional[str]:
try:
return (endpoint["state"]["moduleIf"]["moduleName"], endpoint["state"]["moduleIf"]["clientIfAid"])
except KeyError:
return None
def get_endpoint_capacity(endpoint: Dict[str, any]) -> int:
try:
return int(endpoint["state"]["capacity"])
except KeyError:
return 0
if from_json:
try:
config = from_json["config"]
state = from_json["state"]
self.name = state["name"] if "name" in state else None #Name is optional
self.serviceMode = state["serviceMode"]
Ville Hallivuori
committed
# Implicit transport capacity is a string, where value "none" has special meaning.
# So "none" is correct value, not "None" for missing attribute
self.implicitTransportCapacity = config["implicitTransportCapacity"] if "implicitTransportCapacity" in config else "none"
self.mc = config["mc"] if "mc" in config else None
self.vlan_filter = state["outerVID"] if "outerVID" in state else None
self.href = from_json["href"]
self.endpoints = []
for ep in from_json["endpoints"]:
ep_mod_aip = get_endpoint_mod_aid(ep)
if ep_mod_aip:
self.endpoints.append(CEndpoint(*ep_mod_aip, None, get_endpoint_capacity(ep), ep["href"]))
self.cm_data = from_json
except KeyError as e:
raise ConnectionDeserializationError(f"Missing mandatory key {str(e)}") from e
elif from_tf_service:
self.href = None
self.name = from_tf_service.name()
self.endpoints = [CEndpoint(mod, port, vlan, from_tf_service.capacity, None) for mod,port,vlan in from_tf_service.get_endpoints_mod_aid_vlan()]
# Service mode guessing has to be AFTER endpoint assigment.
# The heuristic used is perfectly valid in context of TF where we always encode
# VLANs to interface names. Correspondingly cm-cli user has to know
# to use VLANs on low level test APIs when using VTI mode.
self.serviceMode = self.__guess_service_mode_from_emulated_enpoints()
Ville Hallivuori
committed
if self.serviceMode == "XR-L1":
self.vlan_filter = None
self.mc = None
Ville Hallivuori
committed
self.implicitTransportCapacity ="portMode"
else:
self.vlan_filter = str(self.__guess_vlan_id()) + " " # Needs to be in string format, can contain ranges, regexp is buggy, trailin space is needed for single VLAN
self.mc = "matchOuterVID"
Ville Hallivuori
committed
# String "none" has a special meaning for implicitTransportCapacity
self.implicitTransportCapacity ="none"
self.cm_data = None
else:
# May support other initializations in future
raise ConnectionDeserializationError("JSON dict missing")
def __str__(self):
name = self.name if self.name else "<NO NAME>"
endpoints = ", ".join((str(ep) for ep in self.endpoints))
return f"name: {name}, id: {self.href}, service-mode: {self.serviceMode}, end-points: [{endpoints}]"
def __guess_service_mode_from_emulated_enpoints(self):
for ep in self.endpoints:
if ep.vlan is not None:
Ville Hallivuori
committed
return "XR-VTI-P2P"
return "XR-L1"
def __guess_vlan_id(self) -> int:
vlans = []
for ep in self.endpoints:
if ep.vlan is not None and ep.vlan.isnumeric():
vlans.append(int(ep.vlan))
if not vlans:
raise InconsistentVlanConfiguration("VLAN ID is not encoded in TF interface names for VTI mode service")
else:
for vlan in vlans:
if vlan != vlans[0]:
raise InconsistentVlanConfiguration(f"VLAN configuration must match at both ends of the connection, {vlans[0]} != {vlan}")
return vlans[0]
def create_config(self) -> Dict[str, any]:
cfg = {}
set_optional_parameter(cfg, "name", self.name)
cfg["serviceMode"] = self.serviceMode
Ville Hallivuori
committed
cfg["implicitTransportCapacity"] = self.implicitTransportCapacity
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
if self.endpoints:
cfg["endpoints"] = [ep.create_config() for ep in self.endpoints]
set_optional_parameter(cfg, "outerVID", self.vlan_filter)
set_optional_parameter(cfg, "mc", self.mc)
#print(cfg)
return cfg
def get_port_map(self) -> Dict[str, CEndpoint]:
return {ep.portname(): ep for ep in self.endpoints }
# Type hint has to be string, because future annotations (enclosing class)
# is not yet widely available
def get_endpoint_updates(self, old: Optional['Connection']): # -> Tuple[List[str], List[Dict[str, any], List[Tuple[str, Dict[str, any]]]]]:
new_ports = self.get_port_map()
if old is None:
return ([], [new_ep.create_config() for new_ep in new_ports.values()], [])
# Can only compute difference against get from CM, as hrefs are needed
assert old.cm_data is not None
old_ports = old.get_port_map()
deletes = []
creates = []
updates = []
for port, old_ep in old_ports.items():
if port not in new_ports:
assert old_ep.href is not None
deletes.append(old_ep.href)
for port, new_ep in new_ports.items():
if port not in old_ports:
creates.append(new_ep.create_config())
elif old_ports[port].capacity != new_ep.capacity:
updates.append((old_ports[port].href, {"capacity": new_ep.capacity}))
return deletes, creates, updates