Skip to content
CmConnection.py 15.7 KiB
Newer Older
#pylint: disable=invalid-name, missing-function-docstring, line-too-long, logging-fstring-interpolation, missing-class-docstring, missing-module-docstring
import logging
import json
import time
from typing import Tuple, Optional #Any, Iterator, List, , Union
import requests
import urllib3
import re

LOGGER = logging.getLogger(__name__)

class InvalidIfnameError(Exception):
    def __init__(self, ifname):
        # Call the base class constructor with the parameters it needs
        super().__init__(f"Invalid interface name {ifname}, expecting format \"MODULENAME|PORTNAME\"")

class ConnectionDeserializationError(Exception):
    def __init__(self, msg):
        # Call the base class constructor with the parameters it needs
        super().__init__(msg)

def ifname_to_module_and_aid(ifname: str) -> Tuple[str, str]:
    a = ifname.split("|")
    if len(a) != 2:
        raise InvalidIfnameError(ifname)
    return (a[0], a[1])

class Connection:
    def __init__(self, from_json=None):
        def get_endpoint_ifname(endpoint):
            try:
                return endpoint["state"]["moduleIf"]["moduleName"] + "|" + endpoint["state"]["moduleIf"]["clientIfAid"]
            except KeyError:
                return None

        if from_json:
            try:
                state = from_json["state"]
                self.name = state["name"] if "name" in state else None #Name is optional
                self.serviceMode = state["serviceMode"]
                self.href = from_json["href"]

                self.endpoints = []
                for ep in from_json["endpoints"]:
                    ifname = get_endpoint_ifname(ep)
                    if ifname:
                        self.endpoints.append(ifname)
            except KeyError as e:
                raise ConnectionDeserializationError(f"Missing mandatory key, f{str(e)}")
        else:
            # May support other initializations in future
            raise ConnectionDeserializationError("JSON dict missing")

    def __str__(self):
        name = self.name if self.name else "<NO NAME>"
        endpoints = ", ".join(self.endpoints)
        return f"name: {name}, id: {self.href}, service-mode: {self.serviceMode}, end-points: [{endpoints}]"

class ExpiringValue:
    def __init__(self, value, expiry):
        self.__value = value
        self.__expiry = expiry
        self.__created = time.monotonic()

    def get_value(self):
        return self.__value

    def is_valid_for(self, duration):
        if self.__created + self.__expiry >= time.monotonic()+duration:
            return True
        else:
            return False

class CmConnection:
    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
        self.__tls_verify = tls_verify
        if not tls_verify:
            urllib3.disable_warnings()

        self.__timeout = timeout
        self.__username = username
        self.__password = password
        self.__cm_root = 'https://' + address + ':' + str(port)
        self.__access_token = None

    def __post_w_headers(self, path, data, headers, data_as_json=True):
        url = self.__cm_root + path
        try:
            if data_as_json:
                response = requests.post(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
            else:
                response = requests.post(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)

            LOGGER.info(f"POST: {url} ==> {response.status_code}")
            resp = json.loads(response.text)
            return (response.status_code, resp)
        except requests.exceptions.Timeout:
            LOGGER.info(f"POST: {url} ==> timeout")
            return None
        except json.JSONDecodeError as json_err:
            LOGGER.info(f"POST: {url} ==> response json decode error: {str(json_err)}")
            return None
        except Exception as e:  # pylint: disable=broad-except
            es=str(e)
            LOGGER.info(f"POST: {url} ==> unexpected exception: {es}")
            return None

    def __post(self, path, data, data_as_json=True):
        return self.__post_w_headers(path, data, self.__http_headers(), data_as_json=data_as_json)

    def __put(self, path, data, data_as_json=True):
        url = self.__cm_root + path
        headers = self.__http_headers()
        try:
            if data_as_json:
                response = requests.put(url, headers=headers, json=data, timeout=self.__timeout, verify=self.__tls_verify)
            else:
                response = requests.put(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)

            LOGGER.info(f"PUT: {url} ==> {response.status_code}")

            if  response.content == b'null':
                return (response.status_code, None)
            resp = json.loads(response.text)
            return (response.status_code, resp)
        except requests.exceptions.Timeout:
            LOGGER.info(f"PUT: {url} ==> timeout")
            return None
        except json.JSONDecodeError as json_err:
            LOGGER.info(f"PUT: {url} ==> response json decode error: {str(json_err)}")
            return None
        except Exception as e:  # pylint: disable=broad-except
            es=str(e)
            LOGGER.info(f"PUT: {url} ==> unexpected exception: {es}")
            return None

    def __delete(self, path, data=None):
        url = self.__cm_root + path
        headers = self.__http_headers()
        try:
            response = requests.delete(url, headers=headers, data=data, timeout=self.__timeout, verify=self.__tls_verify)
            LOGGER.info(f"DELETE: {url} ==> {response.status_code}")

            if  response.content == b'null':
                return (response.status_code, None)
            resp = json.loads(response.text)
            return (response.status_code, resp)
        except requests.exceptions.Timeout:
            LOGGER.info(f"DELETE: {url} ==> timeout")
            return None
        except json.JSONDecodeError as json_err:
            LOGGER.info(f"DELETE: {url} ==> response json decode error: {str(json_err)}")
            return None
        except Exception as e:  # pylint: disable=broad-except
            es=str(e)
            LOGGER.info(f"DELETE: {url} ==> unexpected exception: {es}")
            return None

    def __http_headers(self):
        self.__ensure_valid_access_token()
        if self.__access_token:
            return {'Authorization': 'Bearer '+ self.__access_token.get_value()}
        else:
            return {}

    def __get_json(self, path, params=None):
        url = self.__cm_root + path
        try:
            response = requests.get(url,headers=self.__http_headers(), timeout=self.__timeout,verify=self.__tls_verify, params=params)
            LOGGER.info(f"GET: {url} {params=} ==> {response.status_code}")
            resp = json.loads(response.text)
            return (response.status_code, resp)
        except requests.exceptions.Timeout:
            LOGGER.info(f"GET: {url} {params=} ==> timeout")
            return None
        except json.JSONDecodeError as json_err:
            LOGGER.info(f"GET: {url} {params=} ==> response json decode error: {str(json_err)}")
            return None
        except Exception as e:  # pylint: disable=broad-except
            es=str(e)
            LOGGER.info(f"GET: {url} {params=} ==> unexpected exception: {es}")
            return None

    def __acquire_access_token(self):
        path = '/realms/xr-cm/protocol/openid-connect/token'
        req = {
            "username": self.__username,
            "password": self.__password,
            "grant_type": "password",
            "client_secret": "xr-web-client",
            "client_id": "xr-web-client"
        }
        (status_code, response) = self.__post_w_headers(path, req, None, data_as_json=False)
        if 200 != status_code or 'access_token' not in response:
            LOGGER.error(f"Authentication failure, status code {status_code}, data {response}")
            return False
        access_token = response['access_token']
        expires = int(response["expires_in"]) if "expires_in" in response else 0
        LOGGER.info(f"Obtained access token {access_token}, expires in {expires}")
        self.__access_token = ExpiringValue(access_token, expires)
        return True

    def __ensure_valid_access_token(self):
        if not self.__access_token or not self.__access_token.is_valid_for(60):
            self.__acquire_access_token()

    def Connect(self) -> bool:
        return self.__acquire_access_token()

    @staticmethod
    def get_constellation_module_ifnames(module):
        ifnames = []
        try:
            module_state = module["state"]
            module_name = module_state["module"]["moduleName"]
            if "endpoints" in module_state:
                for endpoint in module_state["endpoints"]:
                    try:
                        ifname = endpoint["moduleIf"]["clientIfAid"]
                        ifnames.append(f"{module_name}|{ifname}")
                    except KeyError:
                        pass
        except KeyError:
            pass
        return ifnames

    @staticmethod
    def get_constellation_ifnames(constellation):
        ifnames = []
        if "hubModule" in constellation:
            hub = constellation["hubModule"]
            ifnames.extend(CmConnection.get_constellation_module_ifnames(hub))

        if "leafModules" in constellation:
            for leaf in constellation["leafModules"]:
                ifnames.extend(CmConnection.get_constellation_module_ifnames(leaf))
        return ifnames

    @staticmethod
    def get_ifnames_per_constellation(constellation):
        ifnames = []
        try:
            ports = CmConnection.get_constellation_ifnames(constellation)
            constellation_id = constellation["id"]
            for port in ports:
                ifnames.append(port)
        except KeyError:
            return None

        return (constellation_id, ifnames)

    def list_constellations(self):
        status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded")
        if not constellations or status_code != 200:
            return []
        return [CmConnection.get_ifnames_per_constellation(c) for c in constellations]

    def get_constellation_by_hub_name(self, hub_module_name: str):
        qparams = [
            ('content', 'expanded'),
            ('q', '{"hubModule.state.module.moduleName": "' + hub_module_name + '"}')
        ]
        status_code, constellations = self.__get_json("/api/v1/ns/xr-networks?content=expanded", params=qparams)
        if not constellations or status_code != 200 or len(constellations) != 1:
            return None
        return CmConnection.get_ifnames_per_constellation(constellations[0])

    @staticmethod
    def create_connection_config(uid: str, serviceMode: Optional[str], mod1: Optional[str], aid1: Optional[str], mod2: Optional[str], aid2: Optional[str]) -> Connection:
        name = f"TF:{uid}"
        def create_endpoint(mod, aid):
            ep = {
                    "selector": {
                        "ifSelectorByModuleName": {
                            "moduleName": mod,
                            "moduleClientIfAid": aid,
                        }
                    }
            }
            return ep

        connection = { "name" : name}
        if serviceMode:
            connection["serviceMode"] = serviceMode
        endpoints = []
        if mod1:
            endpoints.append(create_endpoint(mod1, aid1))
        if mod2:
            endpoints.append(create_endpoint(mod2, aid2))
        if len(endpoints) > 0:
            connection["endpoints"] = endpoints
        return connection

    # All arguments are mandatory
    def create_connection(self, uid, mod1, aid1, mod2, aid2) -> Optional[str]:
        # Create wants a list, so wrap connection to list
        connection = [CmConnection.create_connection_config(uid, "portMode", mod1, aid1, mod2, aid2)]
        resp = self.__post("/api/v1/ncs/network-connections", connection)
        if resp and resp[0] == 202 and len(resp[1]) == 1 and "href" in resp[1][0]:
            created_resource = resp[1][0]["href"]
            LOGGER.info(f"Created connection {created_resource} {uid=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}")
            # FIXME: remove
            LOGGER.info(self.__get_json(f"/api/v1/ncs{created_resource}?content=expanded"))
            return created_resource
        else:
            return None

    # Modules and aids are optional. Uid is Teraflow UID, and is stored in mae field
    def modify_connection(self, href: str, uid: str, service_mode: Optional[str], mod1: Optional[str]=None, aid1: Optional[str]=None, mod2: Optional[str]=None, aid2: Optional[str]=None) -> Optional[str]:
        connection = CmConnection.create_connection_config(uid, service_mode, mod1, aid1, mod2, aid2)
        resp = self.__put(f"/api/v1/ncs{href}", connection)
        # Returns empty body
        if resp and resp[0] == 202:
            LOGGER.info(f"Updated connection {href=}, {uid=}, {service_mode=}, {mod1=}, {aid1=}, {mod2=}, {aid2=}")
            # Return href used for update to be consisten with create
            return href
        else:
            return None

    def delete_connection(self, href: str) -> bool:
        resp = self.__delete(f"/api/v1/ncs{href}")
        print(resp)
        # Returns empty body
        if resp and resp[0] == 202:
            LOGGER.info(f"Deleted connection {href=}")
            return True
        else:
            return False

    def create_connection_ifnames(self, uid: str, ifname1: str, ifname2: str):
        module1, aid1 = ifname_to_module_and_aid(ifname1)
        module2, aid2 = ifname_to_module_and_aid(ifname2)
        return self.create_connection(uid, module1, aid1, module2, aid2)

    def modify_connection_ifnames(self, href: str, uid: str, ifname1: Optional[str], ifname2: Optional[str], service_mode: Optional[str] =None):
        # Only uid and href are mandatory
        module1, aid1 = ifname_to_module_and_aid(ifname1) if ifname1 else (None, None)
        module2, aid2 = ifname_to_module_and_aid(ifname2) if ifname2 else (None, None)
        return self.modify_connection(href, uid, service_mode, module1, aid1, module2, aid2)

    # Always does the correct thing, that is update if present, otherwise create
    def create_or_update_connection_ifnames(self, uid: str, ifname1: str, ifname2: str) -> Optional[str]:
        module1, aid1 = ifname_to_module_and_aid(ifname1)
        module2, aid2 = ifname_to_module_and_aid(ifname2)

        name = f"TF:{uid}"
        existing_connection = self.get_connection_by_name(name)
        if existing_connection:
            return self.modify_connection(existing_connection.href, uid, module1, aid1, module2, aid2)
        else:
            return self.create_connection(uid, module1, aid1, module2, aid2)

    def get_connection_by_name(self, connection_name: str) -> Optional[Connection]:
        qparams = [
            ('content', 'expanded'),
            ('q', '{"state.name": "' + connection_name + '"}')
        ]
        r = self.__get_json("/api/v1/ncs/network-connections", params=qparams)
        if r and r[0] == 200 and len(r[1]) == 1:
            return Connection(from_json=r[1][0])
        else:
            return None

    def get_connection_by_teraflow_uuid(self, uuid: str) -> Optional[Connection]:
        return self.get_connection_by_name(f"TF:{uuid}")

    def get_connections(self):
        r = self.__get_json("/api/v1/ncs/network-connections?content=expanded")
        if r and r[0] == 200:
            return [Connection(from_json=c) for c in r[1]]
        else:
            return []

    def service_uuid(self, key: str) -> Optional[str]:
        service = re.match(r"^/service\[(.+)\]$", key)
        if service:
            return service.group(1)
        else:
            return None