Newer
Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import collections.abc
import logging
import json
import time
from typing import Optional, List, Dict, Union
import requests
import urllib3
from .connection import Connection
from .transport_capacity import TransportCapacity
from .constellation import Constellation
# https://confluence.infinera.com/display/CR/XR+Network+Service
# https://confluence.infinera.com/pages/viewpage.action?spaceKey=CR&title=XR+Network+Connection+Service#XRNetworkConnectionService-North-boundInterface
# https://bitbucket.infinera.com/projects/XRCM/repos/cm-api/browse/yaml/ncs/v1/ncs.yaml
LOGGER = logging.getLogger(__name__)
class ExpiringValue:
def __init__(self, value, expiry):
self.__value = value
self.__expiry = expiry
self.__created = time.monotonic()
def get_value(self):
return self.__value
def is_valid_for(self, duration):
if self.__created + self.__expiry >= time.monotonic()+duration:
return True
else:
return False
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class UnexpectedEmptyBody(Exception):
pass
class HttpResult:
def __init__(self, method: str, url: str, params: Dict[str, any] = None):
self.method = method
self.url = url
self.text = None
self.json = None
self.status_code = None
self.params = params
self.exception = None
def __str__(self):
status_code = self.status_code if self.status_code is not None else "<not executed>"
return f"{self.method} {self.url} {self.params}, status {status_code}"
def process_http_response(self, response: requests.Response, permit_empty_body:bool = False):
LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}") # FIXME: params
self.status_code = response.status_code
if response.content != b'null' and len(response.text):
self.text = response.text
try:
r_json = json.loads(response.text)
self.json = r_json
except json.JSONDecodeError as json_err:
LOGGER.info(f"{self.method}: {self.url} ==> response json decode error: {str(json_err)}")
self.exception = json_err
elif not permit_empty_body:
raise UnexpectedEmptyBody(f"No body in HTTP response for {self.method} {self.url} (status code {response.status_code}")
def __bool__(self):
# Error codes start at 400, codes below it are successes
return self.status_code is not None and self.text is not None and self.status_code < 400 and self.exception is None
def is_valid_with_status_ignore_body(self, expected_status_code: int) -> bool:
return self.status_code is not None and self.status_code == expected_status_code and self.exception is None
def is_valid_json_with_status(self, expected_status_code: int) -> bool:
return bool(self) and self.status_code == expected_status_code and self.json is not None
def is_valid_json_list_with_status(self, expected_status_code: int, min_entries=-1, max_entries=-1) -> bool:
if not self.is_valid_json_with_status(expected_status_code):
return False
if not isinstance(self.json, collections.abc.Sequence):
return False
if min_entries >=0 and len(self.json) < min_entries:
return False
if max_entries >=0 and len(self.json) > max_entries:
return False
return True
def is_valid_json_obj_with_status(self, expected_status_code: int) -> bool:
if not self.is_valid_json_with_status(expected_status_code):
return False
if not isinstance(self.json, collections.abc.Mapping):
return False
return True
class CmConnection:
def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
self.__tls_verify = tls_verify
if not tls_verify:
urllib3.disable_warnings()
self.__timeout = timeout
self.__username = username
self.__password = password
self.__cm_root = 'https://' + address + ':' + str(port)
self.__access_token = None
def __perform_request(self, http_result: HttpResult, permit_empty_body: bool, fn, *args, **kwargs):
response = fn(*args, **kwargs)
http_result.process_http_response(response, permit_empty_body)
except requests.exceptions.Timeout as e:
LOGGER.info(f"{http_result} ==> timeout")
http_result.exception = e
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"{http_result} ==> unexpected exception: {es}")
http_result.exception = e
return http_result
def __post_w_headers(self, path, data, headers, data_as_json=True) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("POST", url)
if data_as_json:
self.__perform_request(rv, False, requests.post, url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
self.__perform_request(rv, False, requests.post, url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __post(self, path, data, data_as_json=True) -> HttpResult:
return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json)
def __put(self, path: str, data: Union[str,Dict[str, any]], data_as_json:bool =True, permit_empty_body:bool =True) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("PUT", url)
if data_as_json:
self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __get(self, path, params: Dict[str, any]=None) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("GET", url, params)
self.__perform_request(rv, False, requests.get, url, headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params)
return rv
def __delete(self, path, data=None) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("DELETE", url)
self.__perform_request(rv, True, requests.delete, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __http_headers(self):
self.__ensure_valid_access_token()
if self.__access_token:
return {'Authorization': 'Bearer '+ self.__access_token.get_value()}
else:
return {}
def __acquire_access_token(self):
path = '/realms/xr-cm/protocol/openid-connect/token'
req = {
"username": self.__username,
"password": self.__password,
"grant_type": "password",
"client_secret": "xr-web-client",
"client_id": "xr-web-client"
}
resp = self.__post_w_headers(path, req, None, data_as_json=False)
# Slightly more verbose check/logging of failures for authentication to help
# diagnose connectivity problems
if resp.status_code is None:
LOGGER.error("Failed to contact authentication API endpoint")
return False
if not resp.is_valid_json_obj_with_status(200):
LOGGER.error(f"Authentication failure, status code {resp.status_code}, data {resp.text}")
return False
if 'access_token' not in resp.json:
LOGGER.error(f"Authentication failure: missing access_token in JSON, status code {resp.status_code}, data {resp.text}")
access_token = resp.json['access_token']
expires = int(resp.json["expires_in"]) if "expires_in" in resp.json else 0
LOGGER.info(f"Obtained access token {access_token}, expires in {expires}")
self.__access_token = ExpiringValue(access_token, expires)
return True
def __ensure_valid_access_token(self):
if not self.__access_token or not self.__access_token.is_valid_for(60):
self.__acquire_access_token()
def Connect(self) -> bool:
return self.__acquire_access_token()
def list_constellations(self) -> List[Constellation]:
r = self.__get("/api/v1/ns/xr-networks?content=expanded")
if not r.is_valid_json_list_with_status(200):
return [Constellation(c) for c in r.json]
def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]:
qparams = [
('content', 'expanded'),
('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}')
]
r = self.__get("/api/v1/ns/xr-networks?content=expanded", params=qparams)
if not r.is_valid_json_list_with_status(200, 1, 1):
return Constellation(r.json[0])
def get_transport_capacities(self) -> List[TransportCapacity]:
r= self.__get("/api/v1/ns/transport-capacities?content=expanded")
if not r.is_valid_json_list_with_status(200):
return [TransportCapacity(from_json=t) for t in r.json]
def get_transport_capacity_by_name(self, tc_name: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
('q', '{"state.name": "' + tc_name + '"}')
]
r = self.__get("/api/v1/ns/transport-capacities?content=expanded", params=qparams)
if not r.is_valid_json_list_with_status(200, 1, 1):
return TransportCapacity(from_json=r.json[0])
else:
return None
def get_transport_capacity_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
return self.get_transport_capacity_by_name(f"TF:{uuid}")
def create_transport_capacity(self, tc: TransportCapacity) -> Optional[str]:
# Create wants a list, so wrap connection to list
tc_config = [tc.create_config()]
resp = self.__post("/api/v1/ns/transport-capacities", tc_config)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
tc.href = resp.json[0]["href"]
LOGGER.info(f"Created transport-capcity {tc}")
#LOGGER.info(self.__get(f"/api/v1/ns/transport-capacities{tc.href}?content=expanded"))
return tc.href
else:
return None
def delete_transport_capacity(self, href: str) -> bool:
resp = self.__delete(f"/api/v1/ns/transport-capacities{href}")
# Returns empty body
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"Deleted transport-capacity {href=}")
return True
else:
LOGGER.info(f"Deleting transport-capacity {href=} failed, status {resp.status_code}")
return False
def create_connection(self, connection: Connection) -> Optional[str]:
# Create wants a list, so wrap connection to list
cfg = [connection.create_config()]
resp = self.__post("/api/v1/ncs/network-connections", cfg)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
connection.href = resp.json[0]["href"]
LOGGER.info(f"Created connection {connection}")
return connection.href
LOGGER.error(f"Create failure for connection {connection}, result {resp}")
def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]:
cfg = connection.create_config()
# Endpoint updates
# Current CM implementation returns 501 (not implemented) for all of these actions
# CM does not accept endpoint updates properly in same format that is used in initial creation.
# Instead we work around by using more granular APIs.
if "endpoints" in cfg:
del cfg["endpoints"]
if existing_connection is None:
existing_connection = self.get_connection_by_href(href)
ep_deletes, ep_creates, ep_updates = connection.get_endpoint_updates(existing_connection)
#print(ep_deletes)
#print(ep_creates)
#print(ep_updates)
# Perform deletes
for ep_href in ep_deletes:
resp = self.__delete(f"/api/v1/ncs{ep_href}")
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: EP-UPDATE: Deleted connection endpoint {ep_href}")
else:
LOGGER.info(f"update_connection: EP-UPDATE: Failed to delete connection endpoint {ep_href}: {resp}")
# Update capacities for otherwise similar endpoints
for ep_href, ep_cfg in ep_updates:
resp = self.__put(f"/api/v1/ncs{ep_href}", ep_cfg)
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: EP-UPDATE: Updated connection endpoint {ep_href} with {ep_cfg}")
else:
LOGGER.info(f"update_connection: EP-UPDATE: Failed to update connection endpoint {ep_href} with {ep_cfg}: {resp}")
# Perform adds
resp = self.__post(f"/api/v1/ncs{href}/endpoints", ep_creates)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
LOGGER.info(f"update_connection: EP-UPDATE: Created connection endpoints {resp.json[0]} with {ep_creates}")
LOGGER.info(f"update_connection: EP-UPDATE: Failed to create connection endpoints {resp.json[0] if resp.json else None} with {ep_creates}: {resp}")
# Connection update (excluding endpoints)
resp = self.__put(f"/api/v1/ncs{href}", cfg)
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: Updated connection {connection}")
# Return href used for update to be consisten with create
return href
else:
LOGGER.error(f"update_connection: Update failure for connection {connection}, result {resp}")
return None
def delete_connection(self, href: str) -> bool:
resp = self.__delete(f"/api/v1/ncs{href}")
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"Deleted connection {href=}")
return True
else:
return False
# Always does the correct thing, that is update if present, otherwise create
def create_or_update_connection(self, connection: Connection) -> Optional[str]:
existing_connection = self.get_connection_by_name(connection.name)
if existing_connection:
return self.update_connection(existing_connection.href, connection, existing_connection)
return self.create_connection(connection)
def get_connection_by_name(self, connection_name: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
('q', '{"state.name": "' + connection_name + '"}')
]
r = self.__get("/api/v1/ncs/network-connections", params=qparams)
if r.is_valid_json_list_with_status(200, 1, 1):
return Connection(from_json=r.json[0])
else:
return None
def get_connection_by_href(self, href: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
]
r = self.__get(f"/api/v1/ncs{href}", params=qparams)
if r.is_valid_json_obj_with_status(200):
return Connection(from_json=r.json)
def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
return self.get_connection_by_name(f"TF:{uuid}")
def get_connections(self):
r = self.__get("/api/v1/ncs/network-connections?content=expanded")
if r.is_valid_json_list_with_status(200):
return [Connection(from_json=c) for c in r.json]