#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring import collections.abc import logging import json import time from typing import Optional, List, Dict, Union import re import requests import urllib3 from .connection import Connection from .transport_capacity import TransportCapacity from .constellation import Constellation # https://confluence.infinera.com/display/CR/XR+Network+Service # https://confluence.infinera.com/pages/viewpage.action?spaceKey=CR&title=XR+Network+Connection+Service#XRNetworkConnectionService-North-boundInterface # https://bitbucket.infinera.com/projects/XRCM/repos/cm-api/browse/yaml/ncs/v1/ncs.yaml LOGGER = logging.getLogger(__name__) class ExpiringValue: def __init__(self, value, expiry): self.__value = value self.__expiry = expiry self.__created = time.monotonic() def get_value(self): return self.__value def is_valid_for(self, duration): if self.__created + self.__expiry >= time.monotonic()+duration: return True else: return False class UnexpectedEmptyBody(Exception): pass class HttpResult: def __init__(self, method: str, url: str, params: Dict[str, any] = None): self.method = method self.url = url self.text = None self.json = None self.status_code = None self.params = params self.exception = None def __str__(self): status_code = self.status_code if self.status_code is not None else "" return f"{self.method} {self.url} {self.params}, status {status_code}" def process_http_response(self, response: requests.Response, permit_empty_body:bool = False): LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}") # FIXME: params self.status_code = response.status_code if response.content != b'null' and len(response.text): self.text = response.text try: r_json = json.loads(response.text) self.json = r_json except json.JSONDecodeError as json_err: LOGGER.info(f"{self.method}: {self.url} ==> response json decode error: {str(json_err)}") self.exception = json_err elif not permit_empty_body: raise UnexpectedEmptyBody(f"No body in HTTP response for {self.method} {self.url} (status code {response.status_code}") def __bool__(self): # Error codes start at 400, codes below it are successes return self.status_code is not None and self.text is not None and self.status_code < 400 and self.exception is None def is_valid_with_status_ignore_body(self, expected_status_code: int) -> bool: return self.status_code is not None and self.status_code == expected_status_code and self.exception is None def is_valid_json_with_status(self, expected_status_code: int) -> bool: return bool(self) and self.status_code == expected_status_code and self.json is not None def is_valid_json_list_with_status(self, expected_status_code: int, min_entries=-1, max_entries=-1) -> bool: if not self.is_valid_json_with_status(expected_status_code): return False if not isinstance(self.json, collections.abc.Sequence): return False if min_entries >=0 and len(self.json) < min_entries: return False if max_entries >=0 and len(self.json) > max_entries: return False return True def is_valid_json_obj_with_status(self, expected_status_code: int) -> bool: if not self.is_valid_json_with_status(expected_status_code): return False if not isinstance(self.json, collections.abc.Mapping): return False return True class CmConnection: def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None: self.__tls_verify = tls_verify if not tls_verify: urllib3.disable_warnings() self.__timeout = timeout self.__username = username self.__password = password self.__cm_root = 'https://' + address + ':' + str(port) self.__access_token = None def __perform_request(self, http_result: HttpResult, permit_empty_body: bool, fn, *args, **kwargs): try: response = fn(*args, **kwargs) http_result.process_http_response(response, permit_empty_body) except requests.exceptions.Timeout as e: LOGGER.info(f"{http_result} ==> timeout") http_result.exception = e except Exception as e: # pylint: disable=broad-except es=str(e) LOGGER.info(f"{http_result} ==> unexpected exception: {es}") http_result.exception = e return http_result def __post_w_headers(self, path, data, headers, data_as_json=True) -> HttpResult: url = self.__cm_root + path rv = HttpResult("POST", url) if data_as_json: self.__perform_request(rv, False, requests.post, url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify) else: self.__perform_request(rv, False, requests.post, url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify) return rv def __post(self, path, data, data_as_json=True) -> HttpResult: return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json) def __put(self, path: str, data: Union[str,Dict[str, any]], data_as_json:bool =True, permit_empty_body:bool =True) -> HttpResult: url = self.__cm_root + path rv = HttpResult("PUT", url) if data_as_json: self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), json=data, timeout=self.__timeout, verify=self.__tls_verify) else: self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify) return rv def __get(self, path, params: Dict[str, any]=None) -> HttpResult: url = self.__cm_root + path rv = HttpResult("GET", url, params) self.__perform_request(rv, False, requests.get, url, headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params) return rv def __delete(self, path, data=None) -> HttpResult: url = self.__cm_root + path rv = HttpResult("DELETE", url) self.__perform_request(rv, True, requests.delete, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify) return rv def __http_headers(self): self.__ensure_valid_access_token() if self.__access_token: return {'Authorization': 'Bearer '+ self.__access_token.get_value()} else: return {} def __acquire_access_token(self): path = '/realms/xr-cm/protocol/openid-connect/token' req = { "username": self.__username, "password": self.__password, "grant_type": "password", "client_secret": "xr-web-client", "client_id": "xr-web-client" } resp = self.__post_w_headers(path, req, None, data_as_json=False) # Slightly more verbose check/logging of failures for authentication to help # diagnose connectivity problems if resp.status_code is None: LOGGER.error("Failed to contact authentication API endpoint") return False if not resp.is_valid_json_obj_with_status(200): LOGGER.error(f"Authentication failure, status code {resp.status_code}, data {resp.text}") return False if 'access_token' not in resp.json: LOGGER.error(f"Authentication failure: missing access_token in JSON, status code {resp.status_code}, data {resp.text}") return False access_token = resp.json['access_token'] expires = int(resp.json["expires_in"]) if "expires_in" in resp.json else 0 LOGGER.info(f"Obtained access token {access_token}, expires in {expires}") self.__access_token = ExpiringValue(access_token, expires) return True def __ensure_valid_access_token(self): if not self.__access_token or not self.__access_token.is_valid_for(60): self.__acquire_access_token() def Connect(self) -> bool: return self.__acquire_access_token() def list_constellations(self) -> List[Constellation]: r = self.__get("/api/v1/ns/xr-networks?content=expanded") if not r.is_valid_json_list_with_status(200): return [] return [Constellation(c) for c in r.json] def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]: qparams = [ ('content', 'expanded'), ('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}') ] r = self.__get("/api/v1/ns/xr-networks?content=expanded", params=qparams) if not r.is_valid_json_list_with_status(200, 1, 1): return None return Constellation(r.json[0]) def get_transport_capacities(self) -> List[TransportCapacity]: r= self.__get("/api/v1/ns/transport-capacities?content=expanded") if not r.is_valid_json_list_with_status(200): return [] return [TransportCapacity(from_json=t) for t in r.json] def get_transport_capacity_by_name(self, tc_name: str) -> Optional[Connection]: qparams = [ ('content', 'expanded'), ('q', '{"state.name": "' + tc_name + '"}') ] r = self.__get("/api/v1/ns/transport-capacities?content=expanded", params=qparams) if not r.is_valid_json_list_with_status(200, 1, 1): return TransportCapacity(from_json=r.json[0]) else: return None def get_transport_capacity_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]: return self.get_transport_capacity_by_name(f"TF:{uuid}") def create_transport_capacity(self, tc: TransportCapacity) -> Optional[str]: # Create wants a list, so wrap connection to list tc_config = [tc.create_config()] resp = self.__post("/api/v1/ns/transport-capacities", tc_config) if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: tc.href = resp.json[0]["href"] LOGGER.info(f"Created transport-capcity {tc}") #LOGGER.info(self.__get(f"/api/v1/ns/transport-capacities{tc.href}?content=expanded")) return tc.href else: return None def delete_transport_capacity(self, href: str) -> bool: resp = self.__delete(f"/api/v1/ns/transport-capacities{href}") # Returns empty body if resp.is_valid_with_status_ignore_body(202): LOGGER.info(f"Deleted transport-capacity {href=}") return True else: LOGGER.info(f"Deleting transport-capacity {href=} failed, status {resp.status_code}") return False def create_connection(self, connection: Connection) -> Optional[str]: # Create wants a list, so wrap connection to list cfg = [connection.create_config()] resp = self.__post("/api/v1/ncs/network-connections", cfg) if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: connection.href = resp.json[0]["href"] LOGGER.info(f"Created connection {connection}") return connection.href else: LOGGER.error(f"Create failure for connection {connection}, result {resp}") return None def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]: cfg = connection.create_config() # Endpoint updates # Current CM implementation returns 501 (not implemented) for all of these actions # CM does not accept endpoint updates properly in same format that is used in initial creation. # Instead we work around by using more granular APIs. if "endpoints" in cfg: del cfg["endpoints"] if existing_connection is None: existing_connection = self.get_connection_by_href(href) ep_deletes, ep_creates, ep_updates = connection.get_endpoint_updates(existing_connection) #print(ep_deletes) #print(ep_creates) #print(ep_updates) # Perform deletes for ep_href in ep_deletes: resp = self.__delete(f"/api/v1/ncs{ep_href}") if resp.is_valid_with_status_ignore_body(202): LOGGER.info(f"update_connection: EP-UPDATE: Deleted connection endpoint {ep_href}") else: LOGGER.info(f"update_connection: EP-UPDATE: Failed to delete connection endpoint {ep_href}: {resp}") # Update capacities for otherwise similar endpoints for ep_href, ep_cfg in ep_updates: resp = self.__put(f"/api/v1/ncs{ep_href}", ep_cfg) if resp.is_valid_with_status_ignore_body(202): LOGGER.info(f"update_connection: EP-UPDATE: Updated connection endpoint {ep_href} with {ep_cfg}") else: LOGGER.info(f"update_connection: EP-UPDATE: Failed to update connection endpoint {ep_href} with {ep_cfg}: {resp}") # Perform adds resp = self.__post(f"/api/v1/ncs{href}/endpoints", ep_creates) if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: LOGGER.info(f"update_connection: EP-UPDATE: Created connection endpoints {resp.json[0]} with {ep_creates}") else: LOGGER.info(f"update_connection: EP-UPDATE: Failed to create connection endpoints {resp.json[0] if resp.json else None} with {ep_creates}: {resp}") # Connection update (excluding endpoints) resp = self.__put(f"/api/v1/ncs{href}", cfg) # Returns empty body if resp.is_valid_with_status_ignore_body(202): LOGGER.info(f"update_connection: Updated connection {connection}") # Return href used for update to be consisten with create return href else: LOGGER.error(f"update_connection: Update failure for connection {connection}, result {resp}") return None def delete_connection(self, href: str) -> bool: resp = self.__delete(f"/api/v1/ncs{href}") #print(resp) # Returns empty body if resp.is_valid_with_status_ignore_body(202): LOGGER.info(f"Deleted connection {href=}") return True else: return False # Always does the correct thing, that is update if present, otherwise create def create_or_update_connection(self, connection: Connection) -> Optional[str]: existing_connection = self.get_connection_by_name(connection.name) if existing_connection: return self.update_connection(existing_connection.href, connection, existing_connection) else: return self.create_connection(connection) def get_connection_by_name(self, connection_name: str) -> Optional[Connection]: qparams = [ ('content', 'expanded'), ('q', '{"state.name": "' + connection_name + '"}') ] r = self.__get("/api/v1/ncs/network-connections", params=qparams) if r.is_valid_json_list_with_status(200, 1, 1): return Connection(from_json=r.json[0]) else: return None def get_connection_by_href(self, href: str) -> Optional[Connection]: qparams = [ ('content', 'expanded'), ] r = self.__get(f"/api/v1/ncs{href}", params=qparams) if r.is_valid_json_obj_with_status(200): return Connection(from_json=r.json) else: return None def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]: return self.get_connection_by_name(f"TF:{uuid}") def get_connections(self): r = self.__get("/api/v1/ncs/network-connections?content=expanded") if r.is_valid_json_list_with_status(200): return [Connection(from_json=c) for c in r.json] else: return [] def service_uuid(self, key: str) -> Optional[str]: service = re.match(r"^/service\[(.+)\]$", key) if service: return service.group(1) else: return None