Newer
Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import logging
import json
import time
from typing import Optional, List, Dict, Union
import requests
import urllib3
from .connection import Connection
from .transport_capacity import TransportCapacity
from .constellation import Constellation
# https://confluence.infinera.com/display/CR/XR+Network+Service
# https://confluence.infinera.com/pages/viewpage.action?spaceKey=CR&title=XR+Network+Connection+Service#XRNetworkConnectionService-North-boundInterface
# https://bitbucket.infinera.com/projects/XRCM/repos/cm-api/browse/yaml/ncs/v1/ncs.yaml
LOGGER = logging.getLogger(__name__)
class ExpiringValue:
def __init__(self, value, expiry):
self.__value = value
self.__expiry = expiry
self.__created = time.monotonic()
def get_value(self):
return self.__value
def is_valid_for(self, duration):
if self.__created + self.__expiry >= time.monotonic()+duration:
return True
else:
return False
class UnexpectedEmptyBody(Exception):
pass
class HttpResult:
def __init__(self, method: str, url: str, params: Dict[str, any] = None):
self.method = method
self.url = url
self.text = None
self.json = None
self.status_code = None
self.params = params
self.exception = None
def __str__(self):
status_code = self.status_code if self.status_code is not None else "<not executed>"
Ville Hallivuori
committed
if self.text:
if len(self.text) > 1024:
body_text = self.text[:1024] + "..."
else:
body_text = self.text
else:
body_text = "NONE"
return f"{self.method} {self.url} {self.params}, status {status_code}, body {body_text}"
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def process_http_response(self, response: requests.Response, permit_empty_body:bool = False):
LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}") # FIXME: params
self.status_code = response.status_code
if response.content != b'null' and len(response.text):
self.text = response.text
try:
r_json = json.loads(response.text)
self.json = r_json
except json.JSONDecodeError as json_err:
LOGGER.info(f"{self.method}: {self.url} ==> response json decode error: {str(json_err)}")
self.exception = json_err
elif not permit_empty_body:
raise UnexpectedEmptyBody(f"No body in HTTP response for {self.method} {self.url} (status code {response.status_code}")
def __bool__(self):
# Error codes start at 400, codes below it are successes
return self.status_code is not None and self.text is not None and self.status_code < 400 and self.exception is None
def is_valid_with_status_ignore_body(self, expected_status_code: int) -> bool:
return self.status_code is not None and self.status_code == expected_status_code and self.exception is None
def is_valid_json_with_status(self, expected_status_code: int) -> bool:
return bool(self) and self.status_code == expected_status_code and self.json is not None
def is_valid_json_list_with_status(self, expected_status_code: int, min_entries=-1, max_entries=-1) -> bool:
if not self.is_valid_json_with_status(expected_status_code):
return False
if not isinstance(self.json, collections.abc.Sequence):
return False
if min_entries >=0 and len(self.json) < min_entries:
return False
if max_entries >=0 and len(self.json) > max_entries:
return False
return True
def is_valid_json_obj_with_status(self, expected_status_code: int) -> bool:
if not self.is_valid_json_with_status(expected_status_code):
return False
if not isinstance(self.json, collections.abc.Mapping):
return False
return True
class CmConnection:
def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
self.__tls_verify = tls_verify
if not tls_verify:
urllib3.disable_warnings()
self.__timeout = timeout
self.__username = username
self.__password = password
self.__cm_root = 'https://' + address + ':' + str(port)
self.__access_token = None
def __perform_request(self, http_result: HttpResult, permit_empty_body: bool, fn, *args, **kwargs):
response = fn(*args, **kwargs)
http_result.process_http_response(response, permit_empty_body)
except requests.exceptions.Timeout as e:
LOGGER.info(f"{http_result} ==> timeout")
http_result.exception = e
except Exception as e: # pylint: disable=broad-except
es=str(e)
LOGGER.info(f"{http_result} ==> unexpected exception: {es}")
http_result.exception = e
return http_result
def __post_w_headers(self, path, data, headers, data_as_json=True) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("POST", url)
if data_as_json:
self.__perform_request(rv, False, requests.post, url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
self.__perform_request(rv, False, requests.post, url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __post(self, path, data, data_as_json=True) -> HttpResult:
return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json)
def __put(self, path: str, data: Union[str,Dict[str, any]], data_as_json:bool =True, permit_empty_body:bool =True) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("PUT", url)
if data_as_json:
self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), json=data, timeout=self.__timeout, verify=self.__tls_verify)
else:
self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __get(self, path, params: Dict[str, any]=None) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("GET", url, params)
self.__perform_request(rv, False, requests.get, url, headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params)
return rv
def __delete(self, path, data=None) -> HttpResult:
url = self.__cm_root + path
rv = HttpResult("DELETE", url)
self.__perform_request(rv, True, requests.delete, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
return rv
def __http_headers(self):
self.__ensure_valid_access_token()
if self.__access_token:
return {'Authorization': 'Bearer '+ self.__access_token.get_value()}
else:
return {}
def __acquire_access_token(self):
path = '/realms/xr-cm/protocol/openid-connect/token'
req = {
"username": self.__username,
"password": self.__password,
"grant_type": "password",
"client_secret": "xr-web-client",
"client_id": "xr-web-client"
}
resp = self.__post_w_headers(path, req, None, data_as_json=False)
# Slightly more verbose check/logging of failures for authentication to help
# diagnose connectivity problems
if resp.status_code is None:
LOGGER.error("Failed to contact authentication API endpoint")
return False
if not resp.is_valid_json_obj_with_status(200):
LOGGER.error(f"Authentication failure, status code {resp.status_code}, data {resp.text}")
return False
if 'access_token' not in resp.json:
LOGGER.error(f"Authentication failure: missing access_token in JSON, status code {resp.status_code}, data {resp.text}")
access_token = resp.json['access_token']
expires = int(resp.json["expires_in"]) if "expires_in" in resp.json else 0
LOGGER.info(f"Obtained access token {access_token}, expires in {expires}")
self.__access_token = ExpiringValue(access_token, expires)
return True
def __ensure_valid_access_token(self):
if not self.__access_token or not self.__access_token.is_valid_for(60):
self.__acquire_access_token()
def Connect(self) -> bool:
return self.__acquire_access_token()
def list_constellations(self) -> List[Constellation]:
r = self.__get("/api/v1/ns/xr-networks?content=expanded")
if not r.is_valid_json_list_with_status(200):
return [Constellation(c) for c in r.json]
def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]:
qparams = [
('content', 'expanded'),
('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}')
]
r = self.__get("/api/v1/ns/xr-networks?content=expanded", params=qparams)
if not r.is_valid_json_list_with_status(200, 1, 1):
return Constellation(r.json[0])
def get_transport_capacities(self) -> List[TransportCapacity]:
r= self.__get("/api/v1/ns/transport-capacities?content=expanded")
if not r.is_valid_json_list_with_status(200):
return [TransportCapacity(from_json=t) for t in r.json]
def get_transport_capacity_by_name(self, tc_name: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
('q', '{"state.name": "' + tc_name + '"}')
]
r = self.__get("/api/v1/ns/transport-capacities?content=expanded", params=qparams)
if not r.is_valid_json_list_with_status(200, 1, 1):
return TransportCapacity(from_json=r.json[0])
else:
return None
def get_transport_capacity_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
return self.get_transport_capacity_by_name(f"TF:{uuid}")
def create_transport_capacity(self, tc: TransportCapacity) -> Optional[str]:
# Create wants a list, so wrap connection to list
tc_config = [tc.create_config()]
resp = self.__post("/api/v1/ns/transport-capacities", tc_config)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
tc.href = resp.json[0]["href"]
LOGGER.info(f"Created transport-capcity {tc}")
#LOGGER.info(self.__get(f"/api/v1/ns/transport-capacities{tc.href}?content=expanded"))
return tc.href
else:
return None
def delete_transport_capacity(self, href: str) -> bool:
resp = self.__delete(f"/api/v1/ns/transport-capacities{href}")
# Returns empty body
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"Deleted transport-capacity {href=}")
return True
else:
LOGGER.info(f"Deleting transport-capacity {href=} failed, status {resp.status_code}")
return False
def create_connection(self, connection: Connection) -> Optional[str]:
# Create wants a list, so wrap connection to list
cfg = [connection.create_config()]
resp = self.__post("/api/v1/ncs/network-connections", cfg)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
connection.href = resp.json[0]["href"]
LOGGER.info(f"Created connection {connection}")
return connection.href
LOGGER.error(f"Create failure for connection {connection}, result {resp}")
def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]:
cfg = connection.create_config()
# Endpoint updates
# Current CM implementation returns 501 (not implemented) for all of these actions
# CM does not accept endpoint updates properly in same format that is used in initial creation.
# Instead we work around by using more granular APIs.
if "endpoints" in cfg:
del cfg["endpoints"]
if existing_connection is None:
existing_connection = self.get_connection_by_href(href)
ep_deletes, ep_creates, ep_updates = connection.get_endpoint_updates(existing_connection)
#print(ep_deletes)
#print(ep_creates)
#print(ep_updates)
# Perform deletes
for ep_href in ep_deletes:
resp = self.__delete(f"/api/v1/ncs{ep_href}")
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: EP-UPDATE: Deleted connection endpoint {ep_href}")
else:
LOGGER.info(f"update_connection: EP-UPDATE: Failed to delete connection endpoint {ep_href}: {resp}")
# Update capacities for otherwise similar endpoints
for ep_href, ep_cfg in ep_updates:
resp = self.__put(f"/api/v1/ncs{ep_href}", ep_cfg)
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: EP-UPDATE: Updated connection endpoint {ep_href} with {ep_cfg}")
else:
LOGGER.info(f"update_connection: EP-UPDATE: Failed to update connection endpoint {ep_href} with {ep_cfg}: {resp}")
# Perform adds
resp = self.__post(f"/api/v1/ncs{href}/endpoints", ep_creates)
if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
LOGGER.info(f"update_connection: EP-UPDATE: Created connection endpoints {resp.json[0]} with {ep_creates}")
LOGGER.info(f"update_connection: EP-UPDATE: Failed to create connection endpoints {resp.json[0] if resp.json else None} with {ep_creates}: {resp}")
# Connection update (excluding endpoints)
resp = self.__put(f"/api/v1/ncs{href}", cfg)
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"update_connection: Updated connection {connection}")
# Return href used for update to be consisten with create
return href
else:
LOGGER.error(f"update_connection: Update failure for connection {connection}, result {resp}")
return None
def delete_connection(self, href: str) -> bool:
resp = self.__delete(f"/api/v1/ncs{href}")
if resp.is_valid_with_status_ignore_body(202):
LOGGER.info(f"Deleted connection {href=}")
return True
else:
return False
# Always does the correct thing, that is update if present, otherwise create
def create_or_update_connection(self, connection: Connection) -> Optional[str]:
existing_connection = self.get_connection_by_name(connection.name)
if existing_connection:
return self.update_connection(existing_connection.href, connection, existing_connection)
return self.create_connection(connection)
def get_connection_by_name(self, connection_name: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
('q', '{"state.name": "' + connection_name + '"}')
]
r = self.__get("/api/v1/ncs/network-connections", params=qparams)
if r.is_valid_json_list_with_status(200, 1, 1):
return Connection(from_json=r.json[0])
else:
return None
def get_connection_by_href(self, href: str) -> Optional[Connection]:
qparams = [
('content', 'expanded'),
]
r = self.__get(f"/api/v1/ncs{href}", params=qparams)
if r.is_valid_json_obj_with_status(200):
return Connection(from_json=r.json)
def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
return self.get_connection_by_name(f"TF:{uuid}")
def get_connections(self):
r = self.__get("/api/v1/ncs/network-connections?content=expanded")
if r.is_valid_json_list_with_status(200):
return [Connection(from_json=c) for c in r.json]