#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring import logging import json import time from typing import Tuple, Optional #Any, Iterator, List, , Union import requests import urllib3 import re LOGGER = logging.getLogger(__name__) class InvalidIfnameError(Exception): def __init__(self, ifname): # Call the base class constructor with the parameters it needs super().__init__(f"Invalid interface name {ifname}, expecting format \"MODULENAME|PORTNAME\"") class ConnectionDeserializationError(Exception): def __init__(self, msg): # Call the base class constructor with the parameters it needs super().__init__(msg) def ifname_to_module_and_aid(ifname: str) -> Tuple[str, str]: a = ifname.split("|") if len(a) != 2: raise InvalidIfnameError(ifname) return (a[0], a[1]) class Connection: def __init__(self, from_json=None): def get_endpoint_ifname(endpoint): try: return endpoint["state"]["moduleIf"]["moduleName"] + "|" + endpoint["state"]["moduleIf"]["clientIfAid"] except KeyError: return None if from_json: try: state = from_json["state"] self.name = state["name"] if "name" in state else None #Name is optional self.serviceMode = state["serviceMode"] self.href = from_json["href"] self.endpoints = [] for ep in from_json["endpoints"]: ifname = get_endpoint_ifname(ep) if ifname: self.endpoints.append(ifname) except KeyError as e: raise ConnectionDeserializationError(f"Missing mandatory key, f{str(e)}") else: # May support other initializations in future raise ConnectionDeserializationError("JSON dict missing") def __str__(self): name = self.name if self.name else "" endpoints = ", ".join(self.endpoints) return f"name: {name}, id: {self.href}, service-mode: {self.serviceMode}, end-points: [{endpoints}]" class ExpiringValue: def __init__(self, value, expiry): self.__value = value self.__expiry = expiry self.__created = time.monotonic() def get_value(self): return self.__value def is_valid_for(self, duration): if self.__created + self.__expiry >= time.monotonic()+duration: return True else: return False class CmConnection: def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None: self.__tls_verify = tls_verify if not tls_verify: urllib3.disable_warnings() self.__timeout = timeout self.__username = username self.__password = password self.__cm_root = 'https://' + address + ':' + str(port) self.__access_token = None def __post_w_headers(self, path, data, headers, data_as_json=True): url = self.__cm_root + path try: if data_as_json: response = requests.post(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify) else: response = requests.post(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify) LOGGER.info(f"POST: {url} ==> {response.status_code}") resp = json.loads(response.text) return (response.status_code, resp) except requests.exceptions.Timeout: LOGGER.info(f"POST: {url} ==> timeout") return None except json.JSONDecodeError as json_err: LOGGER.info(f"POST: {url} ==> response json decode error: {str(json_err)}") return None except Exception as e: # pylint: disable=broad-except es=str(e) LOGGER.info(f"POST: {url} ==> unexpected exception: {es}") return None def __post(self, path, data, data_as_json=True): return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json) def __put(self, path, data, data_as_json=True): url = self.__cm_root + path headers = self.__http_headers() try: if data_as_json: response = requests.put(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify) else: response = requests.put(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify) LOGGER.info(f"PUT: {url} ==> {response.status_code}") if response.content == b'null': return (response.status_code, None) resp = json.loads(response.text) return (response.status_code, resp) except requests.exceptions.Timeout: LOGGER.info(f"PUT: {url} ==> timeout") return None except json.JSONDecodeError as json_err: LOGGER.info(f"PUT: {url} ==> response json decode error: {str(json_err)}") return None except Exception as e: # pylint: disable=broad-except es=str(e) LOGGER.info(f"PUT: {url} ==> unexpected exception: {es}") return None def __delete(self, path, data=None): url = self.__cm_root + path headers = self.__http_headers() try: response = requests.delete(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify) LOGGER.info(f"DELETE: {url} ==> {response.status_code}") if response.content == b'null': return (response.status_code, None) resp = json.loads(response.text) return (response.status_code, resp) except requests.exceptions.Timeout: LOGGER.info(f"DELETE: {url} ==> timeout") return None except json.JSONDecodeError as json_err: LOGGER.info(f"DELETE: {url} ==> response json decode error: {str(json_err)}") return None except Exception as e: # pylint: disable=broad-except es=str(e) LOGGER.info(f"DELETE: {url} ==> unexpected exception: {es}") return None def __http_headers(self): self.__ensure_valid_access_token() if self.__access_token: return {'Authorization': 'Bearer '+ self.__access_token.get_value()} else: return {} def __get_json(self, path, params=None): url = self.__cm_root + path try: response = requests.get(url,headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params) LOGGER.info(f"GET: {url} {params=} ==> {response.status_code}") resp = json.loads(response.text) return (response.status_code, resp) except requests.exceptions.Timeout: LOGGER.info(f"GET: {url} {params=} ==> timeout") return None except json.JSONDecodeError as json_err: LOGGER.info(f"GET: {url} {params=} ==> response json decode error: {str(json_err)}") return None except Exception as e: # pylint: disable=broad-except es=str(e) LOGGER.info(f"GET: {url} {params=} ==> unexpected exception: {es}") return None def __acquire_access_token(self): path = '/realms/xr-cm/protocol/openid-connect/token' req = { "username": self.__username, "password": self.__password, "grant_type": "password", "client_secret": "xr-web-client", "client_id": "xr-web-client" } (status_code, response) = self.__post_w_headers(path, req, None, data_as_json=False) if 200 != status_code or 'access_token' not in response: LOGGER.error(f"Authentication failure, status code {status_code}, data {response}") return False access_token = response['access_token'] expires = int(response["expires_in"]) if "expires_in" in response else 0 LOGGER.info(f"Obtained access token {access_token}, expires in {expires}") self.__access_token = ExpiringValue(access_token, expires) return True def __ensure_valid_access_token(self): if not self.__access_token or not self.__access_token.is_valid_for(60): self.__acquire_access_token() def Connect(self) -> bool: return self.__acquire_access_token() @staticmethod def get_constellation_module_ifnames(module): ifnames = [] try: module_state = module["state"] module_name = module_state["module"]["moduleName"] if "endpoints" in module_state: for endpoint in module_state["endpoints"]: try: ifname = endpoint["moduleIf"]["clientIfAid"] ifnames.append(f"{module_name}|{ifname}") except KeyError: pass except KeyError: pass return ifnames @staticmethod def get_constellation_ifnames(constellation): ifnames = [] if "hubModule" in constellation: hub = constellation["hubModule"] ifnames.extend(CmConnection.get_constellation_module_ifnames(hub)) if "leafModules" in constellation: for leaf in constellation["leafModules"]: ifnames.extend(CmConnection.get_constellation_module_ifnames(leaf)) return ifnames @staticmethod def get_ifnames_per_constellation(constellation): ifnames = [] try: ports = CmConnection.get_constellation_ifnames(constellation) constellation_id = constellation["id"] for port in ports: ifnames.append(port) except KeyError: return None return (constellation_id, ifnames) def list_constellations(self): status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded") if not constellations or status_code != 200: return [] return [CmConnection.get_ifnames_per_constellation(c) for c in constellations] def get_constellation_by_hub_name(self, hub_module_name: str): qparams = [ ('content', 'expanded'), ('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}') ] status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded", params=qparams) if not constellations or status_code != 200 or len(constellations) != 1: return None return CmConnection.get_ifnames_per_constellation(constellations[0]) @staticmethod def create_connection_config(uid: str, serviceMode: Optional[str], mod1: Optional[str], aid1: Optional[str], mod2: Optional[str], aid2: Optional[str]) -> Connection: name = f"TF:{uid}" def create_endpoint(mod, aid): ep = { "selector": { "ifSelectorByModuleName": { "moduleName": mod, "moduleClientIfAid": aid, } } } return ep connection = { "name" : name} if serviceMode: connection["serviceMode"] = serviceMode endpoints = [] if mod1: endpoints.append(create_endpoint(mod1, aid1)) if mod2: endpoints.append(create_endpoint(mod2, aid2)) if len(endpoints) > 0: connection["endpoints"] = endpoints return connection # All arguments are mandatory def create_connection(self, uid, mod1, aid1, mod2, aid2) -> Optional[str]: # Create wants a list, so wrap connection to list connection = [CmConnection.create_connection_config(uid, "portMode", mod1, aid1, mod2, aid2)] resp = self.__post("/api/v1/ncs/network-connections", connection) if resp and resp[0] == 202 and len(resp[1]) == 1 and "href" in resp[1][0]: created_resource = resp[1][0]["href"] LOGGER.info(f"Created connection {created_resource} {uid=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}") # FIXME: remove LOGGER.info(self.__get_json(f"/api/v1/ncs{created_resource}?content=expanded")) return created_resource else: return None # Modules and aids are optional. Uid is Teraflow UID, and is stored in mae field def modify_connection(self, href: str, uid: str, service_mode: Optional[str], mod1: Optional[str]=None, aid1: Optional[str]=None, mod2: Optional[str]=None, aid2: Optional[str]=None) -> Optional[str]: connection = CmConnection.create_connection_config(uid, service_mode, mod1, aid1, mod2, aid2) resp = self.__put(f"/api/v1/ncs{href}", connection) # Returns empty body if resp and resp[0] == 202: LOGGER.info(f"Updated connection {href=}, {uid=}, {service_mode=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}") # Return href used for update to be consisten with create return href else: return None def delete_connection(self, href: str) -> bool: resp = self.__delete(f"/api/v1/ncs{href}") print(resp) # Returns empty body if resp and resp[0] == 202: LOGGER.info(f"Deleted connection {href=}") return True else: return False def create_connection_ifnames(self, uid: str, ifname1: str, ifname2: str): module1, aid1 = ifname_to_module_and_aid(ifname1) module2, aid2 = ifname_to_module_and_aid(ifname2) return self.create_connection(uid, module1, aid1, module2, aid2) def modify_connection_ifnames(self, href: str, uid: str, ifname1: Optional[str], ifname2: Optional[str], service_mode: Optional[str] =None): # Only uid and href are mandatory module1, aid1 = ifname_to_module_and_aid(ifname1) if ifname1 else (None, None) module2, aid2 = ifname_to_module_and_aid(ifname2) if ifname2 else (None, None) return self.modify_connection(href, uid, service_mode, module1, aid1, module2, aid2) # Always does the correct thing, that is update if present, otherwise create def create_or_update_connection_ifnames(self, uid: str, ifname1: str, ifname2: str) -> Optional[str]: module1, aid1 = ifname_to_module_and_aid(ifname1) module2, aid2 = ifname_to_module_and_aid(ifname2) name = f"TF:{uid}" existing_connection = self.get_connection_by_name(name) if existing_connection: return self.modify_connection(existing_connection.href, uid, module1, aid1, module2, aid2) else: return self.create_connection(uid, module1, aid1, module2, aid2) def get_connection_by_name(self, connection_name: str) -> Optional[Connection]: qparams = [ ('content', 'expanded'), ('q', '{"state.name": "' + connection_name + '"}') ] r = self.__get_json("/api/v1/ncs/network-connections", params=qparams) if r and r[0] == 200 and len(r[1]) == 1: return Connection(from_json=r[1][0]) else: return None def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]: return self.get_connection_by_name(f"TF:{uuid}") def get_connections(self): r = self.__get_json("/api/v1/ncs/network-connections?content=expanded") if r and r[0] == 200: return [Connection(from_json=c) for c in r[1]] else: return [] def service_uuid(self, key: str) -> Optional[str]: service = re.match(r"^/service\[(.+)\]$", key) if service: return service.group(1) else: return None