Skip to content
Snippets Groups Projects
Commit 6c9acafc authored by Ville Hallivuori's avatar Ville Hallivuori
Browse files

Pull request #3: vhallivu/teraflow cm support

Merge in XRCA/teraflow from vhallivu/update_cm_authentication to xr_development

Squashed commit of the following:

commit 6a7069ded9b899afe4c9a218f1062da5c69782e4
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Fri Aug 12 07:51:32 2022 +0300

    review + lint fixes

commit f02d6f4ce42f909067a2570daff136e1ed0c7bbd
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Thu Aug 11 14:39:12 2022 +0300

    XR connection delete support

commit 81c3aae5ba105f6650f5392f438200c4b505eea1
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Thu Aug 11 12:39:29 2022 +0300

    Create services to CM

commit 56b3fb3acfc3281b3b3136269163697ca679913c
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Thu Aug 11 11:17:28 2022 +0300

    Improved connection management code, code refactoring

commit 46ef072679ffd37c22c4bf959bb8ba009cf4a082
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Wed Aug 10 17:30:27 2022 +0300

    More support for connection management

commit 49a2762abd7c1c8adafd666624b31976281d3860
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Wed Aug 10 15:52:17 2022 +0300

    Refactored CM connection, now using CM if database

commit 1b404f2c10befeecb48a7a2278f5e9ec60c25d9a
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Wed Aug 10 14:14:03 2022 +0300

    Start of refactoring of CM connectivity

commit 8ec8251bafbba559a4881e50be0aa068340fe536
Author: Ville Hallivuori <VHallivuori@infinera.com>
Date:   Tue Aug 9 16:37:49 2022 +0300

    Improved CM authentication
parent ac066a01
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!20XR Driver
...@@ -57,6 +57,15 @@ SOURCE VENV ACTIVATE ON ANY SHELL USED FOR PYTHON RELATED WORK (e.g. pytest). ...@@ -57,6 +57,15 @@ SOURCE VENV ACTIVATE ON ANY SHELL USED FOR PYTHON RELATED WORK (e.g. pytest).
Use apt-get to install any missing tools (e.g. jq is required). Use apt-get to install any missing tools (e.g. jq is required).
For host based Python development (e.g. VS Code) and test script execution, generate protobuf stubs:
```bash
cd proto
./generate_code_python.sh
cd ../src/context
ln -s ../../proto/src/python proto
```
## Building ## Building
Run deploy script to build in docker containers and then instantiate to configured K8s cluster. Deploy script must be sources for this to work! Run deploy script to build in docker containers and then instantiate to configured K8s cluster. Deploy script must be sources for this to work!
...@@ -80,3 +89,19 @@ Good logs to check are: ...@@ -80,3 +89,19 @@ Good logs to check are:
* kubectl logs service/deviceservice --namespace tfs * kubectl logs service/deviceservice --namespace tfs
* kubectl logs service/webuiservice --namespace tfs * kubectl logs service/webuiservice --namespace tfs
## cm-cli
The tool cm-cli in the xr driver directory can be use to connect to CM and test the connectivity. For example:
```bash
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --show-constellation-by-hub-name="XR HUB 1"
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --list-constellations
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --create-connection="FOO;XR HUB 1|XR-T4;XR LEAF 1|XR-T1"
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --show-connection-by-name="FooBar123"
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --list-connections
# Modify argumens: href;uuid;ifname;ifname or href;uuid
# uuid translates to name TF:uuid
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --modify-connection="/network-connections/138f0cc0-3dc6-4195-97c0-2cbed5fd59ba;FooBarAaa"
./cm-cli.py 172.19.219.44 443 xr-user-1 xr-user-1 --delete-connection=/network-connections/138f0cc0-3dc6-4195-97c0-2cbed5fd59ba
```
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging
import json
import time
from typing import Tuple, Optional #Any, Iterator, List, , Union
import requests
import urllib3
import re
LOGGER = logging.getLogger(__name__)
class InvalidIfnameError(Exception):
def __init__(self, ifname):
# Call the base class constructor with the parameters it needs
super().__init__(f"Invalid interface name {ifname}, expecting format \"MODULENAME|PORTNAME\"")
class ConnectionDeserializationError(Exception):
def __init__(self, msg):
# Call the base class constructor with the parameters it needs
super().__init__(msg)
def ifname_to_module_and_aid(ifname: str) -> Tuple[str, str]:
a = ifname.split("|")
if len(a) != 2:
raise InvalidIfnameError(ifname)
return (a[0], a[1])
class Connection:
def __init__(self, from_json=None):
def get_endpoint_ifname(endpoint):
try:
return endpoint["state"]["moduleIf"]["moduleName"] + "|" + endpoint["state"]["moduleIf"]["clientIfAid"]
except KeyError:
return None
if from_json:
try:
state = from_json["state"]
self.name = state["name"] if "name" in state else None #Name is optional
self.serviceMode = state["serviceMode"]
self.href = from_json["href"]
self.endpoints = []
for ep in from_json["endpoints"]:
ifname = get_endpoint_ifname(ep)
if ifname:
self.endpoints.append(ifname)
except KeyError as e:
raise ConnectionDeserializationError(f"Missing mandatory key, f{str(e)}")
else:
# May support other initializations in future
raise ConnectionDeserializationError("JSON dict missing")
def __str__(self):
name = self.name if self.name else "<NO NAME>"
endpoints = ", ".join(self.endpoints)
return f"name: {name}, id: {self.href}, service-mode: {self.serviceMode}, end-points: [{endpoints}]"
class ExpiringValue:
def __init__(self, value, expiry):
self.__value = value
self.__expiry = expiry
self.__created = time.monotonic()
def get_value(self):
return self.__value
def is_valid_for(self, duration):
if self.__created + self.__expiry >= time.monotonic()+duration:
return True
else:
return False
class CmConnection:
def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
self.__tls_verify = tls_verify
if not tls_verify:
urllib3.disable_warnings()
self.__timeout = timeout
self.__username = username
self.__password = password
self.__cm_root = 'https://' + address + ':' + str(port)
self.__access_token = None
def __post_w_headers(self, path, data, headers, data_as_json=True):
url = self.__cm_root + path
try:
if data_as_json:
response = requests.post(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
response = requests.post(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
LOGGER.info(f"POST: {url} ==> {response.status_code}")
resp = json.loads(response.text)
return (response.status_code, resp)
except requests.exceptions.Timeout:
LOGGER.info(f"POST: {url} ==> timeout")
return None
except json.JSONDecodeError as json_err:
LOGGER.info(f"POST: {url} ==> response json decode error: {str(json_err)}")
return None
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"POST: {url} ==> unexpected exception: {es}")
return None
def __post(self, path, data, data_as_json=True):
return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json)
def __put(self, path, data, data_as_json=True):
url = self.__cm_root + path
headers = self.__http_headers()
try:
if data_as_json:
response = requests.put(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
response = requests.put(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
LOGGER.info(f"PUT: {url} ==> {response.status_code}")
if response.content == b'null':
return (response.status_code, None)
resp = json.loads(response.text)
return (response.status_code, resp)
except requests.exceptions.Timeout:
LOGGER.info(f"PUT: {url} ==> timeout")
return None
except json.JSONDecodeError as json_err:
LOGGER.info(f"PUT: {url} ==> response json decode error: {str(json_err)}")
return None
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"PUT: {url} ==> unexpected exception: {es}")
return None
def __delete(self, path, data=None):
url = self.__cm_root + path
headers = self.__http_headers()
try:
response = requests.delete(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
LOGGER.info(f"DELETE: {url} ==> {response.status_code}")
if response.content == b'null':
return (response.status_code, None)
resp = json.loads(response.text)
return (response.status_code, resp)
except requests.exceptions.Timeout:
LOGGER.info(f"DELETE: {url} ==> timeout")
return None
except json.JSONDecodeError as json_err:
LOGGER.info(f"DELETE: {url} ==> response json decode error: {str(json_err)}")
return None
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"DELETE: {url} ==> unexpected exception: {es}")
return None
def __http_headers(self):
self.__ensure_valid_access_token()
if self.__access_token:
return {'Authorization': 'Bearer '+ self.__access_token.get_value()}
else:
return {}
def __get_json(self, path, params=None):
url = self.__cm_root + path
try:
response = requests.get(url,headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params)
LOGGER.info(f"GET: {url} {params=} ==> {response.status_code}")
resp = json.loads(response.text)
return (response.status_code, resp)
except requests.exceptions.Timeout:
LOGGER.info(f"GET: {url} {params=} ==> timeout")
return None
except json.JSONDecodeError as json_err:
LOGGER.info(f"GET: {url} {params=} ==> response json decode error: {str(json_err)}")
return None
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"GET: {url} {params=} ==> unexpected exception: {es}")
return None
def __acquire_access_token(self):
path = '/realms/xr-cm/protocol/openid-connect/token'
req = {
"username": self.__username,
"password": self.__password,
"grant_type": "password",
"client_secret": "xr-web-client",
"client_id": "xr-web-client"
}
(status_code, response) = self.__post_w_headers(path, req, None, data_as_json=False)
if 200 != status_code or 'access_token' not in response:
LOGGER.error(f"Authentication failure, status code {status_code}, data {response}")
return False
access_token = response['access_token']
expires = int(response["expires_in"]) if "expires_in" in response else 0
LOGGER.info(f"Obtained access token {access_token}, expires in {expires}")
self.__access_token = ExpiringValue(access_token, expires)
return True
def __ensure_valid_access_token(self):
if not self.__access_token or not self.__access_token.is_valid_for(60):
self.__acquire_access_token()
def Connect(self) -> bool:
return self.__acquire_access_token()
@staticmethod
def get_constellation_module_ifnames(module):
ifnames = []
try:
module_state = module["state"]
module_name = module_state["module"]["moduleName"]
if "endpoints" in module_state:
for endpoint in module_state["endpoints"]:
try:
ifname = endpoint["moduleIf"]["clientIfAid"]
ifnames.append(f"{module_name}|{ifname}")
except KeyError:
pass
except KeyError:
pass
return ifnames
@staticmethod
def get_constellation_ifnames(constellation):
ifnames = []
if "hubModule" in constellation:
hub = constellation["hubModule"]
ifnames.extend(CmConnection.get_constellation_module_ifnames(hub))
if "leafModules" in constellation:
for leaf in constellation["leafModules"]:
ifnames.extend(CmConnection.get_constellation_module_ifnames(leaf))
return ifnames
@staticmethod
def get_ifnames_per_constellation(constellation):
ifnames = []
try:
ports = CmConnection.get_constellation_ifnames(constellation)
constellation_id = constellation["id"]
for port in ports:
ifnames.append(port)
except KeyError:
return None
return (constellation_id, ifnames)
def list_constellations(self):
status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded")
if not constellations or status_code != 200:
return []
return [CmConnection.get_ifnames_per_constellation(c) for c in constellations]
def get_constellation_by_hub_name(self, hub_module_name: str):
qparams = [
('content', 'expanded'),
('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}')
]
status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded", params=qparams)
if not constellations or status_code != 200 or len(constellations) != 1:
return None
return CmConnection.get_ifnames_per_constellation(constellations[0])
@staticmethod
def create_connection_config(uid: str, serviceMode: Optional[str], mod1: Optional[str], aid1: Optional[str], mod2: Optional[str], aid2: Optional[str]) -> Connection:
name = f"TF:{uid}"
def create_endpoint(mod, aid):
ep = {
"selector": {
"ifSelectorByModuleName": {
"moduleName": mod,
"moduleClientIfAid": aid,
}
}
}
return ep
connection = { "name" : name}
if serviceMode:
connection["serviceMode"] = serviceMode
endpoints = []
if mod1:
endpoints.append(create_endpoint(mod1, aid1))
if mod2:
endpoints.append(create_endpoint(mod2, aid2))
if len(endpoints) > 0:
connection["endpoints"] = endpoints
return connection
# All arguments are mandatory
def create_connection(self, uid, mod1, aid1, mod2, aid2) -> Optional[str]:
# Create wants a list, so wrap connection to list
connection = [CmConnection.create_connection_config(uid, "portMode", mod1, aid1, mod2, aid2)]
resp = self.__post("/api/v1/ncs/network-connections", connection)
if resp and resp[0] == 202 and len(resp[1]) == 1 and "href" in resp[1][0]:
created_resource = resp[1][0]["href"]
LOGGER.info(f"Created connection {created_resource} {uid=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}")
# FIXME: remove
LOGGER.info(self.__get_json(f"/api/v1/ncs{created_resource}?content=expanded"))
return created_resource
else:
return None
# Modules and aids are optional. Uid is Teraflow UID, and is stored in mae field
def modify_connection(self, href: str, uid: str, service_mode: Optional[str], mod1: Optional[str]=None, aid1: Optional[str]=None, mod2: Optional[str]=None, aid2: Optional[str]=None) -> Optional[str]:
connection = CmConnection.create_connection_config(uid, service_mode, mod1, aid1, mod2, aid2)
resp = self.__put(f"/api/v1/ncs{href}", connection)
# Returns empty body
if resp and resp[0] == 202:
LOGGER.info(f"Updated connection {href=}, {uid=}, {service_mode=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}")
# Return href used for update to be consisten with create
return href
else:
return None
def delete_connection(self, href: str) -> bool:
resp = self.__delete(f"/api/v1/ncs{href}")
print(resp)
# Returns empty body
if resp and resp[0] == 202:
LOGGER.info(f"Deleted connection {href=}")
return True
else:
return False
def create_connection_ifnames(self, uid: str, ifname1: str, ifname2: str):
module1, aid1 = ifname_to_module_and_aid(ifname1)
module2, aid2 = ifname_to_module_and_aid(ifname2)
return self.create_connection(uid, module1, aid1, module2, aid2)
def modify_connection_ifnames(self, href: str, uid: str, ifname1: Optional[str], ifname2: Optional[str], service_mode: Optional[str] =None):
# Only uid and href are mandatory
module1, aid1 = ifname_to_module_and_aid(ifname1) if ifname1 else (None, None)
module2, aid2 = ifname_to_module_and_aid(ifname2) if ifname2 else (None, None)
return self.modify_connection(href, uid, service_mode, module1, aid1, module2, aid2)
# Always does the correct thing, that is update if present, otherwise create
def create_or_update_connection_ifnames(self, uid: str, ifname1: str, ifname2: str) -> Optional[str]:
module1, aid1 = ifname_to_module_and_aid(ifname1)
module2, aid2 = ifname_to_module_and_aid(ifname2)
name = f"TF:{uid}"
existing_connection = self.get_connection_by_name(name)
if existing_connection:
return self.modify_connection(existing_connection.href, uid, module1, aid1, module2, aid2)
else:
return self.create_connection(uid, module1, aid1, module2, aid2)
def get_connection_by_name(self, connection_name: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
('q', '{"state.name": "' + connection_name + '"}')
]
r = self.__get_json("/api/v1/ncs/network-connections", params=qparams)
if r and r[0] == 200 and len(r[1]) == 1:
return Connection(from_json=r[1][0])
else:
return None
def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
return self.get_connection_by_name(f"TF:{uuid}")
def get_connections(self):
r = self.__get_json("/api/v1/ncs/network-connections?content=expanded")
if r and r[0] == 200:
return [Connection(from_json=c) for c in r[1]]
else:
return []
def service_uuid(self, key: str) -> Optional[str]:
service = re.match(r"^/service\[(.+)\]$", key)
if service:
return service.group(1)
else:
return None
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging, requests, threading import logging, requests, threading
from typing import Any, Iterator, List, Optional, Tuple, Union from typing import Any, Iterator, List, Optional, Tuple, Union
...@@ -19,6 +20,12 @@ from device.service.driver_api._Driver import _Driver ...@@ -19,6 +20,12 @@ from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS from . import ALL_RESOURCE_KEYS
#from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service #from .Tools import create_connectivity_service, find_key, config_getter, delete_connectivity_service
import json import json
from .CmConnection import CmConnection
# Don't complain about non-verified SSL certificate. This driver is demo only
# and CM is not provisioned in demos with a proper certificate.
import urllib3
urllib3.disable_warnings()
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -27,43 +34,22 @@ class XrDriver(_Driver): ...@@ -27,43 +34,22 @@ class XrDriver(_Driver):
self.__lock = threading.Lock() self.__lock = threading.Lock()
self.__started = threading.Event() self.__started = threading.Event()
self.__terminate = threading.Event() self.__terminate = threading.Event()
self.__cm_root = 'https://' + address + ':' + str(port)
self.__timeout = int(settings.get('timeout', 120)) self.__timeout = int(settings.get('timeout', 120))
self.__verify = False; # Currently using self signed certificates # Mandatory key, an exception will get thrown if missing
self.__audience = settings["audience"] if "audience" in settings else "test" self.__hub_module_name = settings["hub_module_name"]
self.__client_id = settings["client_id"] if "client_id" in settings else "test"
tls_verify = False # Currently using self signed certificates
username = settings["username"] if "username" in settings else "xr-user-1"
password = settings["password"] if "password" in settings else "xr-user-1"
self.__services = {} self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify)
# FIXME: remove LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, {settings=}")
LOGGER.info(f"FIXME!!! XrDriver, cm {address}:{port}, {settings=}");
def Connect(self) -> bool: def Connect(self) -> bool:
url = self.__cm_root + '/oauth/token'
with self.__lock: with self.__lock:
if self.__started.is_set(): return True if self.__started.is_set(): return True
try: if not self.__cm_connection.Connect():
# TODO: could also do get: https://${HOSTNAME}:443/oauth/token?client_id=test&audience=test"
req = {"grant_type":"client_credentials","client_id": self.__client_id, "audience": self.__audience}
response = requests.post(url,data=req,timeout=self.__timeout,verify=self.__verify)
resp = json.loads(response.text)
if 'access_token' in resp:
self.__access_token=resp['access_token']
LOGGER.info(f"FIXME!!! CM connected, {self.__access_token=}") ## TODO: remove
# Use in subsequend requests as named argument headers=self.__cm_http_headers
self.__cm_http_headers = {'Authorization': 'Bearer '+ self.__access_token}
else:
LOGGER.exception('No access token provided by {:s}'.format(str(self.__cm_root)))
return False
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__cm_root)))
return False
except json.JSONDecodeError as json_err:
LOGGER.exception(f"Exception parsing JSON access token from {str(self.__cm_root)}, {str(json_err)}")
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__cm_root)))
return False return False
else: else:
self.__started.set() self.__started.set()
...@@ -78,62 +64,70 @@ class XrDriver(_Driver): ...@@ -78,62 +64,70 @@ class XrDriver(_Driver):
with self.__lock: with self.__lock:
return [] return []
def fake_interface_names(self) -> List[str]:
interfaces = []
# Using 4 as max leaf and lane to keep prints small during development
for lane in range(0,4):
interfaces.append(f"HUB-LANE-{lane:02}")
for leaf in range(1,5):
for lane in range(0,4):
interfaces.append(f"LEAF-{leaf:02}-LANE-{lane:02}")
return interfaces
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list) chk_type('resources', resource_keys, list)
results = []
# TODO: Completely fake interface information until we get same info from CM
for ifname in self.fake_interface_names():
results.append((f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}))
return results constellation = self.__cm_connection.get_constellation_by_hub_name(self.__hub_module_name)
if constellation:
_cid, if_list = constellation
return [(f"/endpoints/endpoint[{ifname}]", {'uuid': ifname, 'type': 'optical', 'sample_types': {}}) for ifname in if_list]
else:
return []
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"FIXME!!! XrDriver, SetConfig {resources=}"); LOGGER.info(f"SetConfig {resources=}");
# Logged config seems like: # Logged config seems like:
#[('/service[44ca3570-4e1a-49b5-8aab-06c92f239fab:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "HUB-LANE-01", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "LEAF-02-LANE-01", "uuid": "44ca3570-4e1a-49b5-8aab-06c92f239fab:optical"}')] #[('/service[52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical]', '{"capacity_unit": "GHz", "capacity_value": 1, "direction": "UNIDIRECTIONAL", "input_sip": "XR HUB 1|XR-T4", "layer_protocol_name": "PHOTONIC_MEDIA", "layer_protocol_qualifier": "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC", "output_sip": "XR LEAF 1|XR-T1", "uuid": "52ff5f0f-fda4-40bd-a0b1-066f4ff04079:optical"}')]
results = [] results = []
if len(resources) == 0: if len(resources) == 0:
return results return results
# Temporary dummy version
for key, config in resources: for key, config in resources:
self.__services[key] = config service_uuid = self.__cm_connection.service_uuid(key)
if service_uuid:
# TODO: config to CM config = json.loads(config)
# Ignore "direction=UNIDIRECITONAL", it seems that controller only creates one direction... href = self.__cm_connection.create_or_update_connection_ifnames(service_uuid, config["input_sip"], config["output_sip"])
results.append(True) if href:
LOGGER.info(f"SetConfig: Created service {service_uuid} as {href}")
results.append(True)
else:
LOGGER.error(f"SetConfig: Service creation failure for {service_uuid}")
results.append(False)
else:
results.append(False)
return results return results
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
LOGGER.info(f"FIXME!!! XrDriver, DeleteConfig {resources=}"); LOGGER.info(f"DeleteConfig {resources=}");
# Input looks like:
# resources=[('/service[c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical]', '{"uuid": "c8a35e81-88d8-4468-9afc-a8abd92a64d0:optical"}')]
results = [] results = []
if len(resources) == 0: return results if len(resources) == 0: return results
# Temporary dummy version # Temporary dummy version
for key, config in resources: for key, _config in resources:
if key in self.__services[key]: service_uuid = self.__cm_connection.service_uuid(key)
del self.__services[key] if service_uuid:
# TODO: Delete config from CM connection = self.__cm_connection.get_connection_by_teraflow_uuid(service_uuid)
results.append(True) if connection is None:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} does not exist, delete is no-op")
results.append(True)
else:
was_deleted = self.__cm_connection.delete_connection(connection.href)
if was_deleted:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} deleted (was {str(connection)})")
else:
LOGGER.info(f"DeleteConfig: Connection {service_uuid} delete failure (was {str(connection)})")
results.append(was_deleted)
else: else:
results.append(False) results.append(False)
return results return results
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
......
#!/usr/bin/env python3
# Test program for CmConnection
import CmConnection
import argparse
import logging
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description='CM Connectin Test Utility')
parser.add_argument('ip', help='CM IP address or domain name')
parser.add_argument('port', help='CM port', type=int)
parser.add_argument('username', help='Username')
parser.add_argument('password', help='Password')
parser.add_argument('--list-constellations', action='store_true')
parser.add_argument('--show-constellation-by-hub-name', nargs='?', type=str)
parser.add_argument('--create-connection', nargs='?', type=str, help="uuid;ifname;ifname")
parser.add_argument('--modify-connection', nargs='?', type=str, help="href;uuid;ifname;ifname")
parser.add_argument('--show-connection-by-name', nargs='?', type=str)
parser.add_argument('--list-connections', action='store_true')
parser.add_argument('--delete-connection', nargs='?', type=str, help="connection id, e.g. \"/network-connections/4505d5d3-b2f3-40b8-8ec2-4a5b28523c03\"")
args = parser.parse_args()
cm = CmConnection.CmConnection(args.ip, args.port, args.username, args.password, tls_verify=False)
if not cm.Connect():
exit(-1)
if args.list_constellations:
constellations = cm.list_constellations()
for cid, if_list in constellations:
print("Constellation:", cid)
for if_name in if_list:
print(f" {if_name}")
if args.show_constellation_by_hub_name:
constellation = cm.get_constellation_by_hub_name(args.show_constellation_by_hub_name)
if constellation:
(cid, if_list) = constellation
print("Constellation:", cid)
for if_name in if_list:
print(f" {if_name}")
if args.create_connection:
cc_args = args.create_connection.split(";")
if len(cc_args) != 3:
print("Invalid create connection arguments. Expecting \"oid;ifname1;ifname2\", where ifname is form \"MODULE|PORT\"")
exit(-1)
cm.create_connection_ifnames(*cc_args)
if args.modify_connection:
mc_args = args.modify_connection.split(";")
if len(mc_args) == 2:
cm.modify_connection_ifnames(mc_args[0], mc_args[1], None, None)
elif len(mc_args) == 4:
cm.modify_connection_ifnames(*mc_args)
else:
print("Invalid modify connection arguments. Expecting \"href;oid\" or \"href;oid;ifname1;ifname2\", where ifname is form \"MODULE|PORT\"")
exit(-1)
if args.show_connection_by_name:
connection = cm.get_connection_by_name(args.show_connection_by_name)
if connection:
print(str(connection))
if args.list_connections:
connections = cm.get_connections()
for c in connections:
print(str(c))
if args.delete_connection:
was_deleted = cm.delete_connection(args.delete_connection)
if was_deleted:
print(f"Successfully deleted {args.delete_connection}")
else:
print(f"Failed to delete {args.delete_connection}")
...@@ -68,7 +68,7 @@ ...@@ -68,7 +68,7 @@
"device_config": {"config_rules": [ "device_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "172.19.219.44"}}, {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "172.19.219.44"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "443"}}, {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "443"}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"endpoints\": [{\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"HUB-LANE-01\"}, {\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"LEAF-01-LANE-01\"}]}"}} {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"username\": \"xr-user-1\", \"password\": \"xr-user-1\", \"hub_module_name\": \"XR HUB 1\"}"}}
]}, ]},
"device_operational_status": 1, "device_operational_status": 1,
"device_drivers": [6], "device_drivers": [6],
...@@ -77,31 +77,31 @@ ...@@ -77,31 +77,31 @@
], ],
"links": [ "links": [
{ {
"link_id": {"link_uuid": {"uuid": "R1-EMU/13/0/0==XR1-HUB-LANE-01"}}, "link_id": {"link_uuid": {"uuid": "R1-EMU/13/0/0==XR HUB 1|XR-T4"}},
"link_endpoint_ids": [ "link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "R1-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}}, {"device_id": {"device_uuid": {"uuid": "R1-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
{"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "HUB-LANE-01"}} {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR HUB 1|XR-T4"}}
] ]
}, },
{ {
"link_id": {"link_uuid": {"uuid": "R2-EMU/13/0/0==XR1-LEAF-01-LANE-01"}}, "link_id": {"link_uuid": {"uuid": "R2-EMU/13/0/0==XR HUB 1|XR-T3"}},
"link_endpoint_ids": [ "link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "R2-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}}, {"device_id": {"device_uuid": {"uuid": "R2-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
{"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "LEAF-01-LANE-01"}} {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR HUB 1|XR-T3"}}
] ]
}, },
{ {
"link_id": {"link_uuid": {"uuid": "R3-EMU/13/0/0==XR1-LEAF-02-LANE-01"}}, "link_id": {"link_uuid": {"uuid": "R3-EMU/13/0/0==XR1-XR LEAF 1|XR-T1"}},
"link_endpoint_ids": [ "link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "R3-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}}, {"device_id": {"device_uuid": {"uuid": "R3-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
{"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "LEAF-02-LANE-01"}} {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR LEAF 1|XR-T1"}}
] ]
}, },
{ {
"link_id": {"link_uuid": {"uuid": "R4-EMU/13/0/0==XR1-LEAF-03-LANE-01"}}, "link_id": {"link_uuid": {"uuid": "R4-EMU/13/0/0==XR LEAF 2|XR-T1"}},
"link_endpoint_ids": [ "link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "R4-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}}, {"device_id": {"device_uuid": {"uuid": "R4-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
{"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "LEAF-03-LANE-01"}} {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR LEAF 2|XR-T1"}}
] ]
} }
] ]
......
...@@ -143,20 +143,26 @@ DEVICE_R4_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R4_ENDPOINT_ ...@@ -143,20 +143,26 @@ DEVICE_R4_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R4_ENDPOINT_
DEVICE_X1_UUID = 'X1-XR-CONSTELLATION' DEVICE_X1_UUID = 'X1-XR-CONSTELLATION'
DEVICE_X1_TIMEOUT = 120 DEVICE_X1_TIMEOUT = 120
DEVICE_X1_ENDPOINT_DEFS = [ DEVICE_X1_ENDPOINT_DEFS = [
('HUB-LANE-01', 'optical', []), ('XR HUB 1|XR-T1', 'optical', []),
('LEAF-01-LANE-01', 'optical', []), ('XR HUB 1|XR-T2', 'optical', []),
('LEAF-02-LANE-01', 'optical', []), ('XR HUB 1|XR-T3', 'optical', []),
('LEAF-03-LANE-01', 'optical', []), ('XR HUB 1|XR-T4', 'optical', []),
('XR LEAF 1|XR-T1', 'optical', []),
('XR LEAF 2|XR-T1', 'optical', []),
] ]
DEVICE_X1_ID = json_device_id(DEVICE_X1_UUID) DEVICE_X1_ID = json_device_id(DEVICE_X1_UUID)
DEVICE_X1 = json_device_tapi_disabled(DEVICE_X1_UUID) DEVICE_X1 = json_device_tapi_disabled(DEVICE_X1_UUID)
DEVICE_X1_ENDPOINT_IDS = json_endpoint_ids(DEVICE_X1_ID, DEVICE_X1_ENDPOINT_DEFS) DEVICE_X1_ENDPOINT_IDS = json_endpoint_ids(DEVICE_X1_ID, DEVICE_X1_ENDPOINT_DEFS)
ENDPOINT_ID_X1_EP1 = DEVICE_X1_ENDPOINT_IDS[0] # These match JSON, hence indexes are what theyt are
ENDPOINT_ID_X1_EP2 = DEVICE_X1_ENDPOINT_IDS[1] ENDPOINT_ID_X1_EP1 = DEVICE_X1_ENDPOINT_IDS[3]
ENDPOINT_ID_X1_EP3 = DEVICE_X1_ENDPOINT_IDS[2] ENDPOINT_ID_X1_EP2 = DEVICE_X1_ENDPOINT_IDS[2]
ENDPOINT_ID_X1_EP4 = DEVICE_X1_ENDPOINT_IDS[3] ENDPOINT_ID_X1_EP3 = DEVICE_X1_ENDPOINT_IDS[4]
ENDPOINT_ID_X1_EP4 = DEVICE_X1_ENDPOINT_IDS[5]
DEVICE_X1_CONNECT_RULES = json_device_connect_rules(DEVICE_X1_ADDRESS, DEVICE_X1_PORT, { DEVICE_X1_CONNECT_RULES = json_device_connect_rules(DEVICE_X1_ADDRESS, DEVICE_X1_PORT, {
'timeout' : DEVICE_X1_TIMEOUT, 'timeout' : DEVICE_X1_TIMEOUT,
"username": "xr-user-1",
"password": "xr-user-1",
"hub_module_name": "XR HUB 1"
}) })
# Always using real device (CM, whether CM has emulated backend is another story) # Always using real device (CM, whether CM has emulated backend is another story)
#if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_X1_ENDPOINT_DEFS) #if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_X1_ENDPOINT_DEFS)
......
...@@ -32,8 +32,7 @@ LOGGER = logging.getLogger(__name__) ...@@ -32,8 +32,7 @@ LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG) LOGGER.setLevel(logging.DEBUG)
DEVTYPE_EMU_PR = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value DEVTYPE_EMU_PR = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value
#DEVTYPE_EMU_OLS = DeviceTypeEnum.EMULATED_OPTICAL_LINE_SYSTEM.value DEVTYPE_XR_CONSTELLATION = DeviceTypeEnum.XR_CONSTELLATION.value
DEVTYPE_EMU_OLS = DeviceTypeEnum.XR_CONSTELLATION.value
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
...@@ -80,7 +79,7 @@ def test_service_creation(context_client : ContextClient, osm_wim : MockOSM): # ...@@ -80,7 +79,7 @@ def test_service_creation(context_client : ContextClient, osm_wim : MockOSM): #
# ----- Validate collected events ---------------------------------------------------------------------------------- # ----- Validate collected events ----------------------------------------------------------------------------------
packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR) packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR)
optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_EMU_OLS) optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_XR_CONSTELLATION)
optical_service_uuid = '{:s}:optical'.format(service_uuid) optical_service_uuid = '{:s}:optical'.format(service_uuid)
expected_events = [ expected_events = [
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import get_setting
from common.tests.EventTools import EVENT_REMOVE, EVENT_UPDATE, check_events
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Service import json_service_id
from common.tools.grpc.Tools import grpc_message_to_json_string
from compute.tests.mock_osm.MockOSM import MockOSM
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.proto.context_pb2 import ContextId, Empty
from .ObjectsXr import (
CONTEXT_ID, CONTEXTS, DEVICE_X1_UUID, DEVICE_R1_UUID, DEVICE_R3_UUID, DEVICES, LINKS, TOPOLOGIES, WIM_MAPPING,
WIM_PASSWORD, WIM_USERNAME)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DEVTYPE_EMU_PR = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value
DEVTYPE_XR_CONSTELLATION = DeviceTypeEnum.XR_CONSTELLATION.value
@pytest.fixture(scope='session')
def context_client():
_client = ContextClient(get_setting('CONTEXTSERVICE_SERVICE_HOST'), get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC'))
yield _client
_client.close()
@pytest.fixture(scope='session')
def osm_wim():
wim_url = 'http://{:s}:{:s}'.format(
get_setting('COMPUTESERVICE_SERVICE_HOST'), str(get_setting('COMPUTESERVICE_SERVICE_PORT_HTTP')))
return MockOSM(wim_url, WIM_MAPPING, WIM_USERNAME, WIM_PASSWORD)
def test_scenario_is_correct(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is created ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
LOGGER.info('Services[{:d}] = {:s}'.format(len(response.services), grpc_message_to_json_string(response)))
assert len(response.services) == 2 # L3NM + TAPI
for service in response.services:
service_id = service.service_id
response = context_client.ListConnections(service_id)
LOGGER.info(' ServiceId[{:s}] => Connections[{:d}] = {:s}'.format(
grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response)))
assert len(response.connections) == 1 # one connection per service
def test_service_removal(context_client : ContextClient, osm_wim : MockOSM): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
events_collector = EventsCollector(context_client, log_events_received=True)
events_collector.start()
# ----- Delete Service ---------------------------------------------------------------------------------------------
response = context_client.ListServiceIds(ContextId(**CONTEXT_ID))
LOGGER.info('Services[{:d}] = {:s}'.format(len(response.service_ids), grpc_message_to_json_string(response)))
assert len(response.service_ids) == 2 # L3NM + TAPI
service_uuids = set()
for service_id in response.service_ids:
service_uuid = service_id.service_uuid.uuid
if service_uuid.endswith(':optical'): continue
service_uuids.add(service_uuid)
osm_wim.conn_info[service_uuid] = {}
assert len(service_uuids) == 1 # assume a single service has been created
service_uuid = set(service_uuids).pop()
osm_wim.delete_connectivity_service(service_uuid)
# ----- Validate collected events ----------------------------------------------------------------------------------
packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR)
optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_XR_CONSTELLATION)
optical_service_uuid = '{:s}:optical'.format(service_uuid)
expected_events = [
('ConnectionEvent', EVENT_REMOVE, json_connection_id(packet_connection_uuid)),
('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R1_UUID)),
('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R3_UUID)),
('ServiceEvent', EVENT_REMOVE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
('ConnectionEvent', EVENT_REMOVE, json_connection_id(optical_connection_uuid)),
('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_X1_UUID)),
('ServiceEvent', EVENT_REMOVE, json_service_id(optical_service_uuid, context_id=CONTEXT_ID)),
]
check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
def test_services_removed(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is removed ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment