Commit 693b55bc authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/xr_device_driver_consistency_enforcement' into 'develop'

XR Device Driver consistency/life cycle management

See merge request !39
parents f6d36933 30ac1136
Loading
Loading
Loading
Loading
+15 −6
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ import urllib3
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type
from device.service.driver_api._Driver import _Driver
from .cm.cm_connection import CmConnection
from .cm.cm_connection import CmConnection, ConsistencyMode
from .cm import tf

# Don't complain about non-verified SSL certificate. This driver is demo only
@@ -43,13 +43,22 @@ class XrDriver(_Driver):
        self.__hub_module_name = settings["hub_module_name"]

        tls_verify = False # Currently using self signed certificates
        username = settings["username"] if "username" in settings else "xr-user-1"
        password = settings["password"] if "password" in settings else "xr-user-1"

        self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify)
        username = settings.get("username", "xr-user-1")
        password = settings.get("password", "xr-user-1")
        
        # Options are:
        #    asynchronous --> operation considered complete when IPM responds with suitable status code,
        #                     including "accepted", that only means request is semantically good and queued.
        #    synchronous  --> operation is considered complete once result is also reflected in GETs in REST API.
        #    lifecycle    --> operation is considered successfull once IPM has completed pluggaable configuration
        #                     or failed in it. This is typically unsuitable for production use
        #                     (as some optics may be transiently unreachable), but is convenient for demos and testin.
        consistency_mode = ConsistencyMode.from_str(settings.get("consistency-mode", "asynchronous"))

        self.__cm_connection = CmConnection(address, int(port), username, password, self.__timeout, tls_verify = tls_verify, consistency_mode=consistency_mode)
        self.__constellation = None

        LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, {settings=}")
        LOGGER.info(f"XrDriver instantiated, cm {address}:{port}, consistency mode {str(consistency_mode)}, {settings=}")

    def __str__(self):
        return f"{self.__hub_module_name}@{self.__cm_address}"
+19 −2
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ import argparse
import logging
import traceback
from typing import Tuple
from cm.cm_connection import CmConnection
from cm.cm_connection import CmConnection, ConsistencyMode
from cm.tf_service import TFService
from cm.transport_capacity import TransportCapacity
from cm.connection import Connection
@@ -43,6 +43,8 @@ parser.add_argument('--delete-connection', nargs='?', type=str, help="connection
parser.add_argument('--list-transport-capacities', action='store_true')
parser.add_argument('--create-transport-capacity', nargs='?', type=str, help="uuid;ifname;ifname;capacity")
parser.add_argument('--emulate-tf-set-config-service', nargs='?', type=str, help="hubmodule;uuid;ifname;ifname;capacity or hubmodule;uuid;ifname;ifname;capacity;FORCE-VTI-ON")
parser.add_argument('--consistency-mode', nargs='?', type=str, help="asynchronous|synchronous|lifecycle;RETRY_INTERVAL_FLOAT_AS_S")
parser.add_argument('--timeout', help='REST call timeout in seconds (per request and total for consistency validation)', type=int, default=60)

args = parser.parse_args()

@@ -66,7 +68,22 @@ def cli_modify_string_to_tf_service(cli_create_str: str) -> Tuple[str, TFService
    print("Invalid object create arguments. Expecting \"href;oid;ifname1;ifname2;bandwidthgbits\" or \"href;oid;ifname1;ifname2\", where ifname is form \"MODULE|PORT\"")
    exit(-1)

cm = CmConnection(args.ip, args.port, args.username, args.password, tls_verify=False)
if args.consistency_mode:
    ca = args.consistency_mode.split(";")
    if 2 != len(ca):
        print("Invalid consistency mode specification. Expecting \"asynchronous|synchronous|lifecycle;RETRY_INTERVAL_FLOAT_AS_S\"")
        exit(-1)
    consistency_mode = ConsistencyMode.from_str(ca[0])
    try:
        retry_interval = float(ca[1])
    except ValueError:
        print("Invalid consistency mode retry interval (non-float)")
        exit(-1)
else:
    consistency_mode = ConsistencyMode.lifecycle
    retry_interval = 0.2

cm = CmConnection(args.ip, args.port, args.username, args.password, timeout=args.timeout, tls_verify=False, consistency_mode=consistency_mode, retry_interval=retry_interval)
if not cm.Connect():
    exit(-1)

+131 −4
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
import collections.abc
import logging
import json
@@ -21,6 +22,7 @@ from typing import Optional, List, Dict, Union
import re
import requests
import urllib3
from enum import Enum
from .connection import Connection
from .transport_capacity import TransportCapacity
from .constellation import Constellation
@@ -49,6 +51,22 @@ class ExpiringValue:
class UnexpectedEmptyBody(Exception):
    pass

# This is enum, not a regular class, see https://docs.python.org/3/library/enum.html
# String based enums require python 3.11, so use nunber based and custom parser
class ConsistencyMode(Enum):
    asynchronous = 0
    synchronous = 1
    lifecycle = 2

    @staticmethod
    def from_str(s: str) -> ConsistencyMode:
        if "synchronous" == s:
            return ConsistencyMode.synchronous
        elif "lifecycle" == s:
            return ConsistencyMode.lifecycle
        # Async is the default
        return ConsistencyMode.asynchronous

class HttpResult:
    def __init__(self, method: str, url: str, params: Dict[str, any] = None):
        self.method = method
@@ -71,7 +89,7 @@ class HttpResult:
        return f"{self.method} {self.url} {self.params},  status {status_code}, body {body_text}"

    def process_http_response(self, response: requests.Response, permit_empty_body:bool = False):
        LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}") # FIXME: params
        LOGGER.info(f"process_http_response(): {self.method}: {self.url} qparams={self.params} ==> {response.status_code}")
        self.status_code  = response.status_code
        if  response.content != b'null' and len(response.text):
            self.text = response.text
@@ -117,12 +135,19 @@ class HttpResult:
        return True

class CmConnection:
    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True) -> None:
    CONSISTENCY_WAIT_LOG_INTERVAL = 1.0

    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000) -> None:
        self.__tls_verify = tls_verify
        if not tls_verify:
            urllib3.disable_warnings()

        self.__consistency_mode = consistency_mode
        self.__timeout = timeout
        self.__retry_interval = retry_interval if retry_interval > 0.01 else 0.01
        # Consistency tries limit is mostly useful for testing where it can be use to make
        # test cases faster without timing dependency
        self.__max_consistency_tries = max_consistency_tries
        self.__username = username
        self.__password = password
        self.__cm_root = 'https://' + address + ':' + str(port)
@@ -275,6 +300,101 @@ class CmConnection:
            LOGGER.info(f"Deleting transport-capacity {href=} failed, status {resp.status_code}")
            return False

    def apply_create_consistency(self, obj, get_fn):
        # Asynchronous, no validation
        if self.__consistency_mode == ConsistencyMode.asynchronous:
            return obj

        ts_start = time.perf_counter()
        log_ts = ts_start
        get_result = get_fn()
        valid = False
        limit = self.__max_consistency_tries
        while True:
            if get_result:
                if self.__consistency_mode == ConsistencyMode.synchronous:
                    valid = True
                    break
                if get_result.life_cycle_info.is_terminal_state():
                    valid = True
                    break
                else:
                    ts = time.perf_counter()
                    if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL:
                        log_ts = ts
                        LOGGER.info(f"apply_create_consistency(): waiting for life cycle state progress for {get_result}, current: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds")
            else:
                ts = time.perf_counter()
                if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL:
                    log_ts = ts
                    LOGGER.info(f"apply_create_consistency(): waiting for REST API object for {obj}, ellapsed time {ts-ts_start} seconds")
            limit -= 1
            if limit < 0 or time.perf_counter() - ts_start > self.__timeout:
                break
            time.sleep(self.__retry_interval)
            get_result = get_fn()

        duration = time.perf_counter() - ts_start
        if not valid:
            if get_result:
                LOGGER.info(f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds")
            else:
                LOGGER.info(f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds")
            return None
        else:
            LOGGER.info(f"Applied create consistency for {get_result}, final life-cycle-state {str(get_result.life_cycle_info)}, duration {duration} seconds")

        return get_result

    def apply_delete_consistency(self, href: str, get_fn):
        # Asynchronous, no validation
        if self.__consistency_mode == ConsistencyMode.asynchronous:
            return None

        ts_start = time.perf_counter()
        log_ts = ts_start
        get_result = get_fn()
        valid = False
        limit = self.__max_consistency_tries
        while True:
            if not get_result:
                # Object no longer exist, so this is completely successful operation
                valid = True
                break
            else:
                # In delete, treat terminal life cycle state as criteria for ConsistencyMode.synchronous:
                # This is unobvious, but in delete non-existence is stronger guarantee than just lifecycle
                # (so this is exact opposite )
                if get_result.life_cycle_info.is_terminal_state() and self.__consistency_mode == ConsistencyMode.synchronous:
                    valid = True
                    break
                else:
                    ts = time.perf_counter()
                    if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL:
                        log_ts = ts
                        if get_result.life_cycle_info.is_terminal_state():
                            LOGGER.info(f"apply_delete_consistency(): waiting for delete to be reflected in REST API for {get_result}, current life-cycle-state: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds")
                        else:
                            LOGGER.info(f"apply_delete_consistency(): waiting for life cycle state progress for {get_result}, current: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds")

            limit -= 1
            if limit < 0 or time.perf_counter() - ts_start > self.__timeout:
                break
            time.sleep(self.__retry_interval)
            get_result = get_fn()

        duration = time.perf_counter() - ts_start
        if not valid:
            if get_result:
                if not get_result.life_cycle_info.is_terminal_state():
                    LOGGER.info(f"Failed to apply create delete for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds")
                else:
                    LOGGER.info(f"Failed to apply delete consistency for {get_result}, REST object did not dissappear, duration {duration} seconds")
        else:
            LOGGER.info(f"Applied delete consistency for {href}, duration {duration} seconds")

        return get_result

    def create_connection(self, connection: Connection) -> Optional[str]:
        # Create wants a list, so wrap connection to list
        cfg = [connection.create_config()]
@@ -282,8 +402,14 @@ class CmConnection:
        resp = self.__post("/api/v1/ncs/network-connections", cfg)
        if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
            connection.href = resp.json[0]["href"]
            LOGGER.info(f"Created connection {connection}")
            return connection.href
            LOGGER.info(f"IPM accepted create request for connection {connection}")
            new_connection = self.apply_create_consistency(connection, lambda: self.get_connection_by_href(connection.href))
            if new_connection:
                LOGGER.info(f"Created connection {new_connection}")
                return new_connection.href
            else:
                LOGGER.error(f"Consistency failure for connection {connection}, result {resp}")
                return None
        else:
            LOGGER.error(f"Create failure for connection {connection}, result {resp}")
            return None
@@ -344,6 +470,7 @@ class CmConnection:
        #print(resp)
        # Returns empty body
        if resp.is_valid_with_status_ignore_body(202):
            self.apply_delete_consistency(href, lambda: self.get_connection_by_href(href))
            LOGGER.info(f"Deleted connection {href=}")
            return True
        else:
+43 −1
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from typing import Dict, Optional
from dataclasses import dataclass
from .tf_service import TFService
@@ -33,7 +34,6 @@ class CEndpoint:
    capacity: int
    href: Optional[str]


    def ifname(self) -> str:
        if self.vlan is None:
            return self.module + "|" + self.port
@@ -56,6 +56,45 @@ class CEndpoint:

        return cfg

@dataclass
class LifeCycleInfo:
    # State is None (if not known), or one of the following (in future there might be more, so lets not assuem too much)
    #     'pendingConfiguration': This state occurs when one of the network connection modules is pending configuration or pending deletion.
    #     'configured': This state occurs when all network connection modules are configured.
    #     'configurationFailed': This state may occur when at least a configuration of a module from this network connection failed or timeout.
    #     'pendingDeletion': This state may occur when a request to delete this network connection is being processed.
    #     'deletionFailed': This state may occur when at least a removal of a module from this network connection failed or timeout.
    #     'networkConflict': This state may occur when there is a conflict in a network connection module configuration.
    #     'deleted': This state occurs when a network connection is removed.
    state: Optional[str]
    reason: Optional[str]

    def is_terminal_state(self) -> bool:
        if self.state is None:
            return True
        if self.state.endswith('Failed') or self.state.endswith('Conflict'):
            return True
        if self.state == "configured" or self.state == "deleted":
            return True
        return False

    def __str__(self):
        state_str = "unknown" if self.state is None else self.state
        if self.reason:
            return f"({state_str} (reason: {self.reason})"
        return state_str

    @staticmethod
    def new_from_top_level_json(json_dict: Dict[str, any]) -> LifeCycleInfo:
        if "state" not in json_dict:
            return LifeCycleInfo(None, None)
        state = json_dict["state"]
        return LifeCycleInfo(state.get("lifecycleState", None), state.get("lifecycleReason", None))

    @staticmethod
    def new_unknown() -> LifeCycleInfo:
        return LifeCycleInfo(None, "not yet communicated with IPM")

class ConnectionDeserializationError(Exception):
    pass

@@ -91,6 +130,8 @@ class Connection:
                    ep_mod_aip = get_endpoint_mod_aid(ep)
                    if ep_mod_aip:
                        self.endpoints.append(CEndpoint(*ep_mod_aip, None, get_endpoint_capacity(ep), ep["href"]))

                self.life_cycle_info = LifeCycleInfo.new_from_top_level_json(from_json)
                self.cm_data = from_json
            except KeyError as e:
                raise ConnectionDeserializationError(f"Missing mandatory key {str(e)}") from e
@@ -113,6 +154,7 @@ class Connection:
                # String "none" has a special meaning for implicitTransportCapacity
                self.implicitTransportCapacity ="none"

            self.life_cycle_info = LifeCycleInfo.new_unknown()
            self.cm_data = None
        else:
            # May support other initializations in future
+4 −0
Original line number Diff line number Diff line
@@ -30,6 +30,8 @@ def test_connection_json():

        assert connection.name == "FooBar123"
        assert "name: FooBar123, id: /network-connections/4505d5d3-b2f3-40b8-8ec2-4a5b28523c03, service-mode: XR-L1, end-points: [(XR LEAF 1|XR-T1, 0), (XR HUB 1|XR-T1, 0)]" == str(connection)
        assert "configured" == str(connection.life_cycle_info)
        assert connection.life_cycle_info.is_terminal_state()

        config = connection.create_config()
        expected_config = {'name': 'FooBar123', 'serviceMode': 'XR-L1', 'implicitTransportCapacity': 'portMode', 'endpoints': [{'selector': {'moduleIfSelectorByModuleName': {'moduleName': 'XR LEAF 1', 'moduleClientIfAid': 'XR-T1'}}}, {'selector': {'moduleIfSelectorByModuleName': {'moduleName': 'XR HUB 1', 'moduleClientIfAid': 'XR-T1'}}}]}
@@ -94,6 +96,8 @@ def test_connection_from_service():
    # Port mode
    connection = Connection(from_tf_service=TFService("FooBar123", "XR LEAF 1|XR-T1", "XR HUB 1|XR-T1", 0))
    assert connection.create_config() == {'name': 'TF:FooBar123', 'serviceMode': 'XR-L1', 'implicitTransportCapacity': 'portMode', 'endpoints': [{'selector': {'moduleIfSelectorByModuleName': {'moduleName': 'XR LEAF 1', 'moduleClientIfAid': 'XR-T1'}}}, {'selector': {'moduleIfSelectorByModuleName': {'moduleName': 'XR HUB 1', 'moduleClientIfAid': 'XR-T1'}}}]}
    assert '(unknown (reason: not yet communicated with IPM)' == str(connection.life_cycle_info)
    assert connection.life_cycle_info.is_terminal_state()

    # VTI mode
    connection = Connection(from_tf_service=TFService("FooBar123", "XR LEAF 1|XR-T1.A", "XR HUB 1|XR-T1.100", 0))
Loading