Newer
Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from typing import Dict, Optional
from dataclasses import dataclass
from .tf_service import TFService
from .utils import make_selector, set_optional_parameter
class InconsistentVlanConfiguration(Exception):
pass
@dataclass
class CEndpoint:
module: str
port: str
# Emulated/translated VLAN. May also be a letter
# Only present on TF side, never on gets from CM.
# VLAN is never transmitted to wire on endpoint, it is purely an internal construct
# However VLAN is part of the whole connection
vlan: str
capacity: int
href: Optional[str]
def ifname(self) -> str:
if self.vlan is None:
return self.module + "|" + self.port
else:
return self.module + "|" + self.port + "." + self.vlan
def portname(self) -> str:
return self.module + "|" + self.port
def __str__(self):
return f"({self.ifname()}, {self.capacity})"
def create_config(self) -> Dict[str, any]:
cfg = {
# VLAN is intentionally ignored here (None argument below)
"selector": make_selector(self.module, self.port, None)
}
if self.capacity > 0:
cfg["capacity"] = self.capacity
return cfg
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass
class LifeCycleInfo:
# State is None (if not known), or one of the following (in future there might be more, so lets not assuem too much)
# 'pendingConfiguration': This state occurs when one of the network connection modules is pending configuration or pending deletion.
# 'configured': This state occurs when all network connection modules are configured.
# 'configurationFailed': This state may occur when at least a configuration of a module from this network connection failed or timeout.
# 'pendingDeletion': This state may occur when a request to delete this network connection is being processed.
# 'deletionFailed': This state may occur when at least a removal of a module from this network connection failed or timeout.
# 'networkConflict': This state may occur when there is a conflict in a network connection module configuration.
# 'deleted': This state occurs when a network connection is removed.
state: Optional[str]
reason: Optional[str]
def is_terminal_state(self) -> bool:
if self.state is None:
return True
if self.state.endswith('Failed') or self.state.endswith('Conflict'):
return True
if self.state == "configured" or self.state == "deleted":
return True
return False
def __str__(self):
state_str = "unknown" if self.state is None else self.state
if self.reason:
return f"({state_str} (reason: {self.reason})"
return state_str
@staticmethod
def new_from_top_level_json(json_dict: Dict[str, any]) -> LifeCycleInfo:
if "state" not in json_dict:
return LifeCycleInfo(None, None)
state = json_dict["state"]
return LifeCycleInfo(state.get("lifecycleState", None), state.get("lifecycleReason", None))
@staticmethod
def new_unknown() -> LifeCycleInfo:
return LifeCycleInfo(None, "not yet communicated with IPM")
class ConnectionDeserializationError(Exception):
pass
class Connection:
def __init__(self, from_json: Optional[Dict[str, any]] = None, from_tf_service: Optional[TFService] = None):
def get_endpoint_mod_aid(endpoint: Dict[str, any]) -> Optional[str]:
try:
return (endpoint["state"]["moduleIf"]["moduleName"], endpoint["state"]["moduleIf"]["clientIfAid"])
except KeyError:
return None
def get_endpoint_capacity(endpoint: Dict[str, any]) -> int:
try:
return int(endpoint["state"]["capacity"])
except KeyError:
return 0
if from_json:
try:
config = from_json["config"]
state = from_json["state"]
self.name = state["name"] if "name" in state else None #Name is optional
self.serviceMode = state["serviceMode"]
Ville Hallivuori
committed
# Implicit transport capacity is a string, where value "none" has special meaning.
# So "none" is correct value, not "None" for missing attribute
self.implicitTransportCapacity = config["implicitTransportCapacity"] if "implicitTransportCapacity" in config else "none"
self.mc = config["mc"] if "mc" in config else None
self.vlan_filter = state["outerVID"] if "outerVID" in state else None
self.href = from_json["href"]
self.endpoints = []
for ep in from_json["endpoints"]:
ep_mod_aip = get_endpoint_mod_aid(ep)
if ep_mod_aip:
self.endpoints.append(CEndpoint(*ep_mod_aip, None, get_endpoint_capacity(ep), ep["href"]))
self.life_cycle_info = LifeCycleInfo.new_from_top_level_json(from_json)
self.cm_data = from_json
except KeyError as e:
raise ConnectionDeserializationError(f"Missing mandatory key {str(e)}") from e
elif from_tf_service:
self.href = None
self.name = from_tf_service.name()
self.endpoints = [CEndpoint(mod, port, vlan, from_tf_service.capacity, None) for mod,port,vlan in from_tf_service.get_endpoints_mod_aid_vlan()]
# Service mode guessing has to be AFTER endpoint assigment.
# The heuristic used is perfectly valid in context of TF where we always encode
# VLANs to interface names. Correspondingly cm-cli user has to know
# to use VLANs on low level test APIs when using VTI mode.
self.serviceMode = self.__guess_service_mode_from_emulated_enpoints()
Ville Hallivuori
committed
if self.serviceMode == "XR-L1":
self.vlan_filter = None
self.mc = None
Ville Hallivuori
committed
self.implicitTransportCapacity ="portMode"
else:
self.vlan_filter = str(self.__guess_vlan_id()) + " " # Needs to be in string format, can contain ranges, regexp is buggy, trailin space is needed for single VLAN
self.mc = "matchOuterVID"
Ville Hallivuori
committed
# String "none" has a special meaning for implicitTransportCapacity
self.implicitTransportCapacity ="none"
self.life_cycle_info = LifeCycleInfo.new_unknown()
self.cm_data = None
else:
# May support other initializations in future
raise ConnectionDeserializationError("JSON dict missing")
def __str__(self):
name = self.name if self.name else "<NO NAME>"
endpoints = ", ".join((str(ep) for ep in self.endpoints))
return f"name: {name}, id: {self.href}, service-mode: {self.serviceMode}, end-points: [{endpoints}]"
Ville Hallivuori
committed
def is_vti_mode(self) -> bool:
return "XR-VTI-P2P" == self.serviceMode
def __guess_service_mode_from_emulated_enpoints(self):
for ep in self.endpoints:
if ep.vlan is not None:
Ville Hallivuori
committed
return "XR-VTI-P2P"
return "XR-L1"
def __guess_vlan_id(self) -> int:
vlans = []
for ep in self.endpoints:
if ep.vlan is not None and ep.vlan.isnumeric():
vlans.append(int(ep.vlan))
if not vlans:
raise InconsistentVlanConfiguration("VLAN ID is not encoded in TF interface names for VTI mode service")
else:
for vlan in vlans:
if vlan != vlans[0]:
raise InconsistentVlanConfiguration(f"VLAN configuration must match at both ends of the connection, {vlans[0]} != {vlan}")
return vlans[0]
def create_config(self) -> Dict[str, any]:
cfg = {}
set_optional_parameter(cfg, "name", self.name)
cfg["serviceMode"] = self.serviceMode
Ville Hallivuori
committed
cfg["implicitTransportCapacity"] = self.implicitTransportCapacity
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
if self.endpoints:
cfg["endpoints"] = [ep.create_config() for ep in self.endpoints]
set_optional_parameter(cfg, "outerVID", self.vlan_filter)
set_optional_parameter(cfg, "mc", self.mc)
#print(cfg)
return cfg
def get_port_map(self) -> Dict[str, CEndpoint]:
return {ep.portname(): ep for ep in self.endpoints }
# Type hint has to be string, because future annotations (enclosing class)
# is not yet widely available
def get_endpoint_updates(self, old: Optional['Connection']): # -> Tuple[List[str], List[Dict[str, any], List[Tuple[str, Dict[str, any]]]]]:
new_ports = self.get_port_map()
if old is None:
return ([], [new_ep.create_config() for new_ep in new_ports.values()], [])
# Can only compute difference against get from CM, as hrefs are needed
assert old.cm_data is not None
old_ports = old.get_port_map()
deletes = []
creates = []
updates = []
for port, old_ep in old_ports.items():
if port not in new_ports:
assert old_ep.href is not None
deletes.append(old_ep.href)
for port, new_ep in new_ports.items():
if port not in old_ports:
creates.append(new_ep.create_config())
elif old_ports[port].capacity != new_ep.capacity:
updates.append((old_ports[port].href, {"capacity": new_ep.capacity}))
return deletes, creates, updates