Skip to content
Snippets Groups Projects
cm_connection.py 17.2 KiB
Newer Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections.abc
import logging
import json
import time
from typing import Optional, List, Dict, Union
import requests
import urllib3
from .connection import Connection
from .transport_capacity import TransportCapacity
from .constellation import Constellation
# https://confluence.infinera.com/display/CR/XR+Network+Service
# https://confluence.infinera.com/pages/viewpage.action?spaceKey=CR&title=XR+Network+Connection+Service#XRNetworkConnectionService-North-boundInterface
# https://bitbucket.infinera.com/projects/XRCM/repos/cm-api/browse/yaml/ncs/v1/ncs.yaml
LOGGER = logging.getLogger(__name__)

class ExpiringValue:
    def __init__(self, value, expiry):
        self.__value = value
        self.__expiry = expiry
        self.__created = time.monotonic()

    def get_value(self):
        return self.__value

    def is_valid_for(self, duration):
        if self.__created + self.__expiry >= time.monotonic()+duration:
            return True
        else:
            return False

class UnexpectedEmptyBody(Exception):
    pass

class HttpResult:
    def __init__(self, method: str, url: str, params: Dict[str, any] = None):
        self.method = method
        self.url = url
        self.text = None
        self.json = None
        self.status_code = None
        self.params = params
        self.exception = None

    def __str__(self):
        status_code = self.status_code if self.status_code is not None else "<not executed>"
        if self.text:
            if len(self.text) > 1024:
                body_text = self.text[:1024] + "..."
            else:
                body_text = self.text
        else:
            body_text = "NONE"
        return f"{self.method} {self.url} {self.params},  status {status_code}, body {body_text}"

    def process_http_response(self, response: requests.Response, permit_empty_body:bool = False):
        LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}") # FIXME: params
        self.status_code  = response.status_code
        if  response.content != b'null' and len(response.text):
            self.text = response.text

            try:
                r_json = json.loads(response.text)
                self.json = r_json
            except json.JSONDecodeError as json_err:
                LOGGER.info(f"{self.method}: {self.url}  ==> response json decode error: {str(json_err)}")
                self.exception = json_err
        elif not permit_empty_body:
            raise UnexpectedEmptyBody(f"No body in HTTP response for {self.method} {self.url} (status code {response.status_code}")

    def __bool__(self):
        # Error codes start at 400, codes below it are successes
        return self.status_code is not None and self.text is not None and self.status_code < 400 and self.exception is None

    def is_valid_with_status_ignore_body(self, expected_status_code: int) -> bool:
        return self.status_code is not None and self.status_code == expected_status_code and self.exception is None

    def is_valid_json_with_status(self, expected_status_code: int) -> bool:
        return bool(self) and self.status_code == expected_status_code and self.json is not None

    def is_valid_json_list_with_status(self, expected_status_code: int, min_entries=-1, max_entries=-1) -> bool:
        if not self.is_valid_json_with_status(expected_status_code):
            return False
        if not isinstance(self.json, collections.abc.Sequence):
            return False

        if min_entries >=0 and len(self.json) < min_entries:
            return False

        if max_entries >=0 and len(self.json) > max_entries:
            return False
        return True

    def is_valid_json_obj_with_status(self, expected_status_code: int) -> bool:
        if not self.is_valid_json_with_status(expected_status_code):
            return False
        if not isinstance(self.json, collections.abc.Mapping):
            return False

        return True

class CmConnection:
    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
        self.__tls_verify = tls_verify
        if not tls_verify:
            urllib3.disable_warnings()

        self.__timeout = timeout
        self.__username = username
        self.__password = password
        self.__cm_root = 'https://' + address + ':' + str(port)
        self.__access_token = None

    def __perform_request(self, http_result: HttpResult, permit_empty_body: bool,  fn, *args, **kwargs):
            response = fn(*args, **kwargs)
            http_result.process_http_response(response, permit_empty_body)
        except requests.exceptions.Timeout as e:
            LOGGER.info(f"{http_result} ==> timeout")
            http_result.exception = e
        except Exception as e:  # pylint: disable=broad-except
            es=str(e)
            LOGGER.info(f"{http_result} ==> unexpected exception: {es}")
            http_result.exception = e
        return http_result

    def __post_w_headers(self, path, data, headers, data_as_json=True) -> HttpResult:
        url = self.__cm_root + path
        rv = HttpResult("POST", url)
        if data_as_json:
            self.__perform_request(rv, False, requests.post, url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
        else:
            self.__perform_request(rv, False, requests.post, url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
        return rv
    def __post(self, path, data, data_as_json=True) -> HttpResult:
        return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json)

    def __put(self, path: str, data: Union[str,Dict[str, any]], data_as_json:bool =True, permit_empty_body:bool =True) -> HttpResult:
        url = self.__cm_root + path
        rv = HttpResult("PUT", url)
        if data_as_json:
            self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), json=data, timeout=self.__timeout, verify=self.__tls_verify)
        else:
            self.__perform_request(rv, permit_empty_body, requests.put, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
        return rv
    def __get(self, path, params: Dict[str, any]=None) -> HttpResult:
        url = self.__cm_root + path
        rv = HttpResult("GET", url, params)
        self.__perform_request(rv, False, requests.get, url, headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params)
        return rv
    def __delete(self, path, data=None) -> HttpResult:
        url = self.__cm_root + path
        rv = HttpResult("DELETE", url)
        self.__perform_request(rv, True, requests.delete, url, headers=self.__http_headers(), data=data, timeout=self.__timeout, verify=self.__tls_verify)
        return rv

    def __http_headers(self):
        self.__ensure_valid_access_token()
        if self.__access_token:
            return {'Authorization': 'Bearer '+ self.__access_token.get_value()}
        else:
            return {}

    def __acquire_access_token(self):
        path = '/realms/xr-cm/protocol/openid-connect/token'
        req = {
            "username": self.__username,
            "password": self.__password,
            "grant_type": "password",
            "client_secret": "xr-web-client",
            "client_id": "xr-web-client"
        }
        resp = self.__post_w_headers(path, req, None, data_as_json=False)
        # Slightly more verbose check/logging of failures for authentication to help
        # diagnose connectivity problems
        if resp.status_code is None:
            LOGGER.error("Failed to contact authentication API endpoint")
            return False
        if not resp.is_valid_json_obj_with_status(200):
            LOGGER.error(f"Authentication failure, status code {resp.status_code}, data {resp.text}")
            return False
        if 'access_token' not in resp.json:
            LOGGER.error(f"Authentication failure: missing access_token in JSON, status code {resp.status_code}, data {resp.text}")
        access_token = resp.json['access_token']
        expires = int(resp.json["expires_in"]) if "expires_in" in resp.json else 0
        LOGGER.info(f"Obtained access token {access_token}, expires in {expires}")
        self.__access_token = ExpiringValue(access_token, expires)
        return True

    def __ensure_valid_access_token(self):
        if not self.__access_token or not self.__access_token.is_valid_for(60):
            self.__acquire_access_token()

    def Connect(self) -> bool:
        return self.__acquire_access_token()

    def list_constellations(self) -> List[Constellation]:
        r = self.__get("/api/v1/ns/xr-networks?content=expanded")
        if not r.is_valid_json_list_with_status(200):
        return [Constellation(c) for c in r.json]
    def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]:
        qparams = [
            ('content', 'expanded'),
            ('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}')
        ]
        r = self.__get("/api/v1/ns/xr-networks?content=expanded", params=qparams)
        if not r.is_valid_json_list_with_status(200, 1, 1):
        return Constellation(r.json[0])

    def get_transport_capacities(self) -> List[TransportCapacity]:
        r= self.__get("/api/v1/ns/transport-capacities?content=expanded")
        if not r.is_valid_json_list_with_status(200):
        return [TransportCapacity(from_json=t) for t in r.json]

    def get_transport_capacity_by_name(self, tc_name: str) -> Optional[Connection]:
        qparams = [
            ('content', 'expanded'),
            ('q', '{"state.name": "' + tc_name + '"}')
        ]
        r = self.__get("/api/v1/ns/transport-capacities?content=expanded", params=qparams)
        if not r.is_valid_json_list_with_status(200, 1, 1):
            return TransportCapacity(from_json=r.json[0])
        else:
            return None

    def get_transport_capacity_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
        return self.get_transport_capacity_by_name(f"TF:{uuid}")

    def create_transport_capacity(self, tc: TransportCapacity) -> Optional[str]:
        # Create wants a list, so wrap connection to list
        tc_config = [tc.create_config()]
        resp = self.__post("/api/v1/ns/transport-capacities", tc_config)
        if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
            tc.href = resp.json[0]["href"]
            LOGGER.info(f"Created transport-capcity {tc}")
            #LOGGER.info(self.__get(f"/api/v1/ns/transport-capacities{tc.href}?content=expanded"))
            return tc.href
        else:
            return None

    def delete_transport_capacity(self, href: str) -> bool:
        resp = self.__delete(f"/api/v1/ns/transport-capacities{href}")

        # Returns empty body
        if resp.is_valid_with_status_ignore_body(202):
            LOGGER.info(f"Deleted transport-capacity {href=}")
            return True
        else:
            LOGGER.info(f"Deleting transport-capacity {href=} failed, status {resp.status_code}")
            return False

    def create_connection(self, connection: Connection) -> Optional[str]:
        # Create wants a list, so wrap connection to list
        cfg = [connection.create_config()]

        resp = self.__post("/api/v1/ncs/network-connections", cfg)
        if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
            connection.href = resp.json[0]["href"]
            LOGGER.info(f"Created connection {connection}")
            return connection.href
            LOGGER.error(f"Create failure for connection {connection}, result {resp}")
    def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]:
        cfg = connection.create_config()

        # Endpoint updates
        # Current CM implementation returns 501 (not implemented) for all of these actions

        # CM does not accept endpoint updates properly in same format that is used in initial creation.
        # Instead we work around by using more granular APIs.
        if "endpoints" in cfg:
            del cfg["endpoints"]
        if existing_connection is None:
            existing_connection = self.get_connection_by_href(href)
        ep_deletes, ep_creates, ep_updates = connection.get_endpoint_updates(existing_connection)
        #print(ep_deletes)
        #print(ep_creates)
        #print(ep_updates)

        # Perform deletes
        for ep_href in ep_deletes:
            resp = self.__delete(f"/api/v1/ncs{ep_href}")
            if resp.is_valid_with_status_ignore_body(202):
                LOGGER.info(f"update_connection: EP-UPDATE: Deleted connection endpoint {ep_href}")
            else:
                LOGGER.info(f"update_connection: EP-UPDATE: Failed to delete connection endpoint {ep_href}: {resp}")

        # Update capacities for otherwise similar endpoints
        for ep_href, ep_cfg in ep_updates:
            resp = self.__put(f"/api/v1/ncs{ep_href}", ep_cfg)
            if resp.is_valid_with_status_ignore_body(202):
                LOGGER.info(f"update_connection: EP-UPDATE: Updated connection endpoint {ep_href} with {ep_cfg}")
            else:
                LOGGER.info(f"update_connection: EP-UPDATE: Failed to update connection endpoint {ep_href} with {ep_cfg}: {resp}")

        # Perform adds
        resp = self.__post(f"/api/v1/ncs{href}/endpoints", ep_creates)
        if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
            LOGGER.info(f"update_connection: EP-UPDATE: Created connection endpoints {resp.json[0]} with {ep_creates}")
            LOGGER.info(f"update_connection: EP-UPDATE: Failed to create connection endpoints {resp.json[0] if resp.json else None} with {ep_creates}: {resp}")

        # Connection update (excluding endpoints)
        resp = self.__put(f"/api/v1/ncs{href}", cfg)
        # Returns empty body
        if resp.is_valid_with_status_ignore_body(202):
            LOGGER.info(f"update_connection: Updated connection {connection}")
            # Return href used for update to be consisten with create
            return href
        else:
            LOGGER.error(f"update_connection: Update failure for connection {connection}, result {resp}")
            return None

    def delete_connection(self, href: str) -> bool:
        resp = self.__delete(f"/api/v1/ncs{href}")
        # Returns empty body
        if resp.is_valid_with_status_ignore_body(202):
            LOGGER.info(f"Deleted connection {href=}")
            return True
        else:
            return False

    # Always does the correct thing, that is update if present, otherwise create
    def create_or_update_connection(self, connection: Connection) -> Optional[str]:
        existing_connection = self.get_connection_by_name(connection.name)
        if existing_connection:
            return self.update_connection(existing_connection.href, connection, existing_connection)
            return self.create_connection(connection)

    def get_connection_by_name(self, connection_name: str) -> Optional[Connection]:
        qparams = [
            ('content', 'expanded'),
            ('q', '{"state.name": "' + connection_name + '"}')
        ]
        r = self.__get("/api/v1/ncs/network-connections", params=qparams)
        if r.is_valid_json_list_with_status(200, 1, 1):
            return Connection(from_json=r.json[0])
    def get_connection_by_href(self, href: str) -> Optional[Connection]:
        qparams = [
            ('content', 'expanded'),
        ]
        r = self.__get(f"/api/v1/ncs{href}", params=qparams)
        if r.is_valid_json_obj_with_status(200):
            return Connection(from_json=r.json)
    def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
        return self.get_connection_by_name(f"TF:{uuid}")

    def get_connections(self):
        r = self.__get("/api/v1/ncs/network-connections?content=expanded")
        if r.is_valid_json_list_with_status(200):
            return [Connection(from_json=c) for c in r.json]
        else:
            return []

    def service_uuid(self, key: str) -> Optional[str]:
        service = re.match(r"^/service\[(.+)\]$", key)
        if service:
            return service.group(1)
        else:
            return None