Commit 741d929b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/xr_driver_ipm_error_subscription_and_reporting' into 'develop'

feat/xr_driver_ipm_error_subscription_and_reporting

See merge request !74
parents f8cbab70 0d158725
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ xmltodict==0.12.0
tabulate
ipaddress
macaddress
websockets==10.4

# pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
+1 −1
Original line number Diff line number Diff line
@@ -107,7 +107,7 @@ This will make imports to work properly in all cases.
Run deploy script to build in docker containers and then instantiate to configured K8s cluster. Deploy script must be sources for this to work!

```bash
./deploy.sh
./deploy/all.sh
```

If protobuf definitions have changed, regenerate version controlled Java files manually
+2 −1
Original line number Diff line number Diff line
@@ -88,6 +88,7 @@ class XrDriver(_Driver):
    def Disconnect(self) -> bool:
        LOGGER.info(f"Disconnect[{self}]")
        with self.__lock:
            self.__cm_connection.stop_monitoring_errors()
            self.__terminate.set()
            return True

+120 −93
Original line number Diff line number Diff line
@@ -16,16 +16,22 @@

# Test program for CmConnection
import argparse
import signal
import logging
import traceback
import threading
from typing import Tuple
from cm.cm_connection import CmConnection, ConsistencyMode
from cm.cm_connection import CmConnection, ConsistencyMode, ErrorFromIpm
from cm.tf_service import TFService
from cm.transport_capacity import TransportCapacity
from cm.connection import Connection
import cm.tf as tf
import asyncio
import websockets
import ssl
import time

logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARNING)

parser = argparse.ArgumentParser(description='CM Connectin Test Utility')
parser.add_argument('ip', help='CM IP address or domain name')
@@ -33,6 +39,7 @@ parser.add_argument('port', help='CM port', type=int)
parser.add_argument('username', help='Username')
parser.add_argument('password', help='Password')

parser.add_argument('--monitor-errors', action='store_true')
parser.add_argument('--list-constellations', action='store_true')
parser.add_argument('--show-constellation-by-hub-name', nargs='?', type=str)
parser.add_argument('--create-connection', nargs='?', type=str, help="uuid;ifname;ifname;capacity")
@@ -84,6 +91,15 @@ else:
    retry_interval = 0.2

cm = CmConnection(args.ip, args.port, args.username, args.password, timeout=args.timeout, tls_verify=False, consistency_mode=consistency_mode, retry_interval=retry_interval)

terminate = threading.Event()
def signal_handler(sig, frame):
    cm.stop_monitoring_errors()
    terminate.set()

signal.signal(signal.SIGINT, signal_handler)

try:
    if not cm.Connect():
        exit(-1)

@@ -104,11 +120,14 @@ if args.show_constellation_by_hub_name:
    if args.create_connection:
        tf_service = cli_create_string_to_tf_service(args.create_connection)
        connection = Connection(from_tf_service=tf_service)
        try:
            created_service = cm.create_connection(connection)
            if created_service:
                print(f"Created {created_service} for {connection}")
            else:
                print(f"Failed to create {connection}")
        except ErrorFromIpm as ipm_err:
            print(f"Failed to create {connection}: {str(ipm_err)}")

    if args.modify_connection:
        href, tf_service = cli_modify_string_to_tf_service(args.modify_connection)
@@ -179,3 +198,11 @@ if args.emulate_tf_set_config_service:
        print(f"Emulated SetConfig() for service result: {result}")
        if isinstance(result, Exception):
            traceback.print_exception(result)

    if args.monitor_errors:
        cm.print_received_errors = True
        terminate.wait()

finally:
# Delete subscriptions. It will end monitoring thread and ensure that program terminates normally
    cm.stop_monitoring_errors()
+223 −19
Original line number Diff line number Diff line
@@ -15,9 +15,13 @@

from __future__ import annotations
import collections.abc
import threading
import logging
import json
import time
import asyncio
import websockets
import ssl
from typing import Optional, List, Dict, Union
import re
import requests
@@ -51,6 +55,55 @@ class ExpiringValue:
class UnexpectedEmptyBody(Exception):
    pass

class ExternalError(Exception):
    pass

class ApiErrorFromIpm(Exception):
    pass

class ErrorFromIpm(ExternalError):
    def __init__(self, err_dict):
        msg = str(err_dict)
        # Try to extract a short error message
        try:
            # Only look at first message
            err_messages = err_dict["errors"]["errors"][0]["messages"]
            for err_msg in err_messages:
                if err_msg["lang"] == "en":
                    msg = err_msg["message"]
        except KeyError:
            pass
        except IndexError:
            pass
        super().__init__(msg)

class CreateConsistencyError(Exception):
    pass

class ErrorStore:
    def __init__(self):
        self.__lock = threading.Lock()
        self.__db={}
        self.__enabled=False

    def get_error(self, uri: str) -> Optional[dict]:
        with self.__lock:
            return self.__db.pop(uri, None)

    def set_error(self, uri: str, err_dict: dict):
        with self.__lock:
            if self.__enabled:
                self.__db[uri] = err_dict

    def enable(self):
        with self.__lock:
            self.__enabled = True

    def disable(self):
        with self.__lock:
            self.__enabled = False
            self.__db.clear()

# This is enum, not a regular class, see https://docs.python.org/3/library/enum.html
# String based enums require python 3.11, so use nunber based and custom parser
class ConsistencyMode(Enum):
@@ -134,10 +187,25 @@ class HttpResult:

        return True

    def raise_as_exception(self):
        if self.exception is not None:
            raise ExternalError(f"Failure for request {str(self)}") from self.exception

        status_code = self.status_code if self.status_code is not None else "<not executed>"

        # Try to get error message from IPM
        if self.json is not None and "errors" in self.json:
            err_list = self.json["errors"]
            if len(err_list) > 0 and "message" in err_list[0]:
                err_msg = err_list[0]["message"]
                raise ApiErrorFromIpm(f"{self.method} {self.url} {self.params},  status {status_code}, IPM reported error: {err_msg}")
            
        raise ExternalError(str(self))

class CmConnection:
    CONSISTENCY_WAIT_LOG_INTERVAL = 1.0

    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000) -> None:
    def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000, monitor_error_stream: bool = True) -> None:
        self.__tls_verify = tls_verify
        if not tls_verify:
            urllib3.disable_warnings()
@@ -151,7 +219,18 @@ class CmConnection:
        self.__username = username
        self.__password = password
        self.__cm_root = 'https://' + address + ':' + str(port)
        self.__cm_ws_root = 'wss://' + address + ':' + str(port)
        self.__access_token = None
        self.__monitor_error_stream = monitor_error_stream
        self.__err_store=ErrorStore()
        self.__err_monitor_thread = None
        self.__err_monitor_connected = threading.Event()
        self.__err_monitor_sub_id = None
        self.__err_monitor_terminate = threading.Event()
        self.print_received_errors = False

    def __del__(self):
        self.stop_monitoring_errors()

    def __perform_request(self, http_result: HttpResult, permit_empty_body: bool,  fn, *args, **kwargs):
        try:
@@ -238,7 +317,121 @@ class CmConnection:
            self.__acquire_access_token()

    def Connect(self) -> bool:
        return self.__acquire_access_token()
        if not self.__acquire_access_token():
            return False
        return self.monitor_errors() if self.__monitor_error_stream else True

    def subscribe_errors(self):
        sub = [
            {
                "subscriptionName": "TfXrDriverErrorMonitopr",
                "subscriptionFilters": [
                    {
                        "requestedNotificationTypes": [ "Error" ],
                        "requestedResources": [
                            {
                                "resourceType": "cm.network-connection",
                            }
                        ]
                    },
                ]
            }
        ]

        r = self.__post("/api/v1/subscriptions/events", sub)
        #print(r.status_code, r.text)
        if not r.is_valid_json_list_with_status(201) or len(r.json) != 1:
            return None, None
        try:
            return self.__cm_ws_root + r.json[0]["notificationChannel"]["streamAddress"], r.json[0]["subscriptionId"]
        except KeyError:
            return None, None

    def unsubscribe(self, sub_id: str):
        resp = self.__delete(f"/api/v1/subscriptions/events/{sub_id}")
        if resp.is_valid_with_status_ignore_body(202):
            LOGGER.info(f"Deleted subscription {sub_id=}")
            return True
        else:
            LOGGER.info(f"Deleting subscription {sub_id=} failed, status {resp.status_code}")
            return False

    def monitor_errors(self) -> bool:
        uri, sub_id = self.subscribe_errors()
        if not uri or not sub_id:
            return False
        self.__err_monitor_sub_id = sub_id

        def err_monitor_thread():
            LOGGER.info(f"Listening errors via {uri}")

            ctx = ssl.create_default_context()
            if not self.__tls_verify:
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE

            async def receive_websock(uri, ssl_ctx):
                while not self.__err_monitor_terminate.is_set():
                    try:
                        async with websockets.connect(uri, ssl=ssl_ctx) as websocket:
                            LOGGER.info(f"err_monitor_thread(): WebSock connected to {uri}")
                            self.__err_monitor_connected.set()
                            while not self.__err_monitor_terminate.is_set():
                                # 5s timeout is used for forced checking of terminate flag
                                # In normal termmination timeout is not experienced, as
                                # we unsubscribe and that will trigger server to close the
                                # connection. This timeout exists purely as backup
                                # in case unsubscribe fails
                                try:
                                    msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
                                except asyncio.exceptions.TimeoutError:
                                    continue
                                if self.print_received_errors:
                                    print(f"RX: {msg}")
                                try:
                                    msg_json = json.loads(msg)
                                    href = msg_json["href"]
                                    LOGGER.debug(f"err_monitor_thread(): RX [{href}]: {msg}")
                                    self.__err_store.set_error(href, msg_json)
                                except json.JSONDecodeError as json_err:
                                    LOGGER.error(f"err_monitor_thread(): Invalid message received: {msg}, JSON decode error {str(json_err)}")
                                except KeyError:
                                    LOGGER.error(f"err_monitor_thread(): Missing href in message: {msg}")
                    except asyncio.CancelledError as e:
                        LOGGER.debug("err_monitor_thread(): monitoring cancelled")
                        raise e
                    except Exception as e:
                        if not self.__err_monitor_terminate.is_set():
                            LOGGER.error(f"err_monitor_thread(): exception {str(e)}, reconnecting")
                            time.sleep(1)

            asyncio.run(receive_websock(uri, ctx))
            LOGGER.debug("err_monitor_thread(): thread terminating")

        assert self.__err_monitor_thread is None
        self.__err_monitor_terminate.clear()
        self.__err_monitor_thread = threading.Thread(target=err_monitor_thread)
        self.__err_monitor_thread.start()
        # If we can get connection soon, wait for it, otherwise proceed without delay
        # Not waiting for connection may miss some errors (-->timeout later), waiting too long
        # makes for bad experience
        self.__err_monitor_connected.wait(0.5)

        return True

    def stop_monitoring_errors(self):
        self.__err_monitor_terminate.set()

        if self.__err_monitor_sub_id:
            LOGGER.debug(f"Disabling error subscribtion {self.__err_monitor_sub_id }")
            self.unsubscribe(self.__err_monitor_sub_id)
            self.__err_monitor_sub_id = None

        if self.__err_monitor_thread is not None:
            LOGGER.debug("Terminating error monitoring thread")
            self.__err_monitor_thread.join()
            LOGGER.info("Error monitoring thread terminated")
            self.__err_monitor_thread = None

    def list_constellations(self) -> List[Constellation]:
        r = self.__get("/api/v1/xr-networks?content=expanded")
@@ -246,7 +439,6 @@ class CmConnection:
            return []
        return [Constellation(c) for c in r.json]


    def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]:
        qparams = [
            ('content', 'expanded'),
@@ -324,6 +516,11 @@ class CmConnection:
                        log_ts = ts
                        LOGGER.info(f"apply_create_consistency(): waiting for life cycle state progress for {get_result}, current: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds")
            else:
                err_info = self.__err_store.get_error(obj.href)
                if err_info is not None:
                    LOGGER.info(f"apply_create_consistency(): asynchronous error reported for {obj}: {str(err_info)}")
                    raise ErrorFromIpm(err_info)

                ts = time.perf_counter()
                if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL:
                    log_ts = ts
@@ -337,10 +534,13 @@ class CmConnection:
        duration = time.perf_counter() - ts_start
        if not valid:
            if get_result:
                LOGGER.info(f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds")
                msg = f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds"
                LOGGER.info(msg)
                raise CreateConsistencyError(msg)
            else:
                LOGGER.info(f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds")
            return None
                msg = f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds"
                LOGGER.info(msg)
                raise CreateConsistencyError(msg)
        else:
            LOGGER.info(f"Applied create consistency for {get_result}, final life-cycle-state {str(get_result.life_cycle_info)}, duration {duration} seconds")

@@ -399,6 +599,8 @@ class CmConnection:
        # Create wants a list, so wrap connection to list
        cfg = [connection.create_config()]

        self.__err_store.enable()
        try:
            resp = self.__post("/api/v1/network-connections", cfg)
            if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]:
                connection.href = resp.json[0]["href"]
@@ -412,7 +614,9 @@ class CmConnection:
                    return None
            else:
                LOGGER.error(f"Create failure for connection {connection}, result {resp}")
            return None
                resp.raise_as_exception()
        finally:
            self.__err_store.disable()

    def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]:
        cfg = connection.create_config()
Loading