From d3131c44cd341006b2a2a91acca5346164153379 Mon Sep 17 00:00:00 2001 From: Ville Hallivuori Date: Wed, 1 Mar 2023 16:01:11 +0200 Subject: [PATCH] Improved XR Device Driver IPM error reporting --- src/device/requirements.in | 1 + src/device/service/drivers/xr/README_XR.md | 2 +- src/device/service/drivers/xr/XrDriver.py | 3 +- src/device/service/drivers/xr/cm-cli.py | 213 ++++++++------- .../service/drivers/xr/cm/cm_connection.py | 242 ++++++++++++++++-- .../drivers/xr/cm/tests/test_cm_connection.py | 10 +- .../xr/cm/tests/test_xr_service_set_config.py | 25 +- src/device/service/drivers/xr/cm/tf.py | 20 +- 8 files changed, 378 insertions(+), 138 deletions(-) diff --git a/src/device/requirements.in b/src/device/requirements.in index ec29fc7a3..50b941160 100644 --- a/src/device/requirements.in +++ b/src/device/requirements.in @@ -29,6 +29,7 @@ xmltodict==0.12.0 tabulate ipaddress macaddress +websockets==10.4 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/device/service/drivers/xr/README_XR.md b/src/device/service/drivers/xr/README_XR.md index fa1bc9440..9c64cdef1 100644 --- a/src/device/service/drivers/xr/README_XR.md +++ b/src/device/service/drivers/xr/README_XR.md @@ -107,7 +107,7 @@ This will make imports to work properly in all cases. Run deploy script to build in docker containers and then instantiate to configured K8s cluster. Deploy script must be sources for this to work! ```bash -./deploy.sh +./deploy/all.sh ``` If protobuf definitions have changed, regenerate version controlled Java files manually diff --git a/src/device/service/drivers/xr/XrDriver.py b/src/device/service/drivers/xr/XrDriver.py index 605f4ce8d..3114bcc2c 100644 --- a/src/device/service/drivers/xr/XrDriver.py +++ b/src/device/service/drivers/xr/XrDriver.py @@ -45,7 +45,7 @@ class XrDriver(_Driver): tls_verify = False # Currently using self signed certificates username = settings.get("username", "xr-user-1") password = settings.get("password", "xr-user-1") - + # Options are: # asynchronous --> operation considered complete when IPM responds with suitable status code, # including "accepted", that only means request is semantically good and queued. @@ -77,6 +77,7 @@ class XrDriver(_Driver): def Disconnect(self) -> bool: LOGGER.info(f"Disconnect[{self}]") with self.__lock: + self.__cm_connection.stop_monitoring_errors() self.__terminate.set() return True diff --git a/src/device/service/drivers/xr/cm-cli.py b/src/device/service/drivers/xr/cm-cli.py index 924ca0c96..6e9dc8387 100755 --- a/src/device/service/drivers/xr/cm-cli.py +++ b/src/device/service/drivers/xr/cm-cli.py @@ -16,16 +16,22 @@ # Test program for CmConnection import argparse +import signal import logging import traceback +import threading from typing import Tuple -from cm.cm_connection import CmConnection, ConsistencyMode +from cm.cm_connection import CmConnection, ConsistencyMode, ErrorFromIpm from cm.tf_service import TFService from cm.transport_capacity import TransportCapacity from cm.connection import Connection import cm.tf as tf +import asyncio +import websockets +import ssl +import time -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.WARNING) parser = argparse.ArgumentParser(description='CM Connectin Test Utility') parser.add_argument('ip', help='CM IP address or domain name') @@ -33,6 +39,7 @@ parser.add_argument('port', help='CM port', type=int) parser.add_argument('username', help='Username') parser.add_argument('password', help='Password') +parser.add_argument('--monitor-errors', action='store_true') parser.add_argument('--list-constellations', action='store_true') parser.add_argument('--show-constellation-by-hub-name', nargs='?', type=str) parser.add_argument('--create-connection', nargs='?', type=str, help="uuid;ifname;ifname;capacity") @@ -84,98 +91,118 @@ else: retry_interval = 0.2 cm = CmConnection(args.ip, args.port, args.username, args.password, timeout=args.timeout, tls_verify=False, consistency_mode=consistency_mode, retry_interval=retry_interval) -if not cm.Connect(): - exit(-1) - -if args.list_constellations: - constellations = cm.list_constellations() - for constellation in constellations: - print("Constellation:", constellation.constellation_id) - for if_name in constellation.ifnames(): - print(f" {if_name}") - -if args.show_constellation_by_hub_name: - constellation = cm.get_constellation_by_hub_name(args.show_constellation_by_hub_name) - if constellation: - print(f"Constellation: {constellation.constellation_id}, traffic-mode: {constellation.traffic_mode}") - for if_name in constellation.ifnames(): - print(f" {if_name}") - -if args.create_connection: - tf_service = cli_create_string_to_tf_service(args.create_connection) - connection = Connection(from_tf_service=tf_service) - created_service = cm.create_connection(connection) - if created_service: - print(f"Created {created_service} for {connection}") - else: - print(f"Failed to create {connection}") - -if args.modify_connection: - href, tf_service = cli_modify_string_to_tf_service(args.modify_connection) - mc_args = args.modify_connection.split(";") - connection = Connection(from_tf_service=tf_service) - result = cm.update_connection(href, connection) - if result: - print(f"Updated {href} for {connection}") - else: - print(f"Failed to update {href} for {connection}") - -if args.show_connection_by_name: - connection = cm.get_connection_by_name(args.show_connection_by_name) - if connection: - print(str(connection)) - -if args.list_connections: - connections = cm.get_connections() - for c in connections: - print(str(c)) - -if args.delete_connection: - was_deleted = cm.delete_connection(args.delete_connection) - if was_deleted: - print(f"Successfully deleted {args.delete_connection}") - else: - print(f"Failed to delete {args.delete_connection}") - -if args.list_transport_capacities: - tcs = cm.get_transport_capacities() - for tc in tcs: - print(str(tc)) - -if args.create_transport_capacity: - tf_service = cli_create_string_to_tf_service(args.create_transport_capacity) - tc = TransportCapacity(from_tf_service=tf_service) - created_service = cm.create_transport_capacity(tc) - if created_service: - print(f"Created {created_service} for {tc}") - else: - print(f"Failed to create {tc}") - -if args.emulate_tf_set_config_service: - eargs = args.emulate_tf_set_config_service.split(";") - if len(eargs) < 5: - print("Mandatory tokens missing for --emulate-tf-set-config-service") - exit(-1) - hub_module_name, uuid, input_sip, output_sip, capacity_value = eargs[0:5] - capacity_value = int(capacity_value) - config = { - "input_sip": input_sip, - "output_sip": output_sip, - "capacity_value": capacity_value, - "capacity_unit": "gigabit" - } +terminate = threading.Event() +def signal_handler(sig, frame): + cm.stop_monitoring_errors() + terminate.set() - constellation = cm.get_constellation_by_hub_name(hub_module_name) +signal.signal(signal.SIGINT, signal_handler) - # Allow testing some of the VTI code before we have CM that has VTI - if len(eargs) > 5 and eargs[5] == "FORCE-VTI-ON": - constellation.traffic_mode = "VTIMode" - - if constellation is None: - print(f"Unable to find constellation for hub-module {hub_module_name}") +try: + if not cm.Connect(): exit(-1) - result = tf.set_config_for_service(cm, constellation, uuid, config) - print(f"Emulated SetConfig() for service result: {result}") - if isinstance(result, Exception): - traceback.print_exception(result) + + if args.list_constellations: + constellations = cm.list_constellations() + for constellation in constellations: + print("Constellation:", constellation.constellation_id) + for if_name in constellation.ifnames(): + print(f" {if_name}") + + if args.show_constellation_by_hub_name: + constellation = cm.get_constellation_by_hub_name(args.show_constellation_by_hub_name) + if constellation: + print(f"Constellation: {constellation.constellation_id}, traffic-mode: {constellation.traffic_mode}") + for if_name in constellation.ifnames(): + print(f" {if_name}") + + if args.create_connection: + tf_service = cli_create_string_to_tf_service(args.create_connection) + connection = Connection(from_tf_service=tf_service) + try: + created_service = cm.create_connection(connection) + if created_service: + print(f"Created {created_service} for {connection}") + else: + print(f"Failed to create {connection}") + except ErrorFromIpm as ipm_err: + print(f"Failed to create {connection}: {str(ipm_err)}") + + if args.modify_connection: + href, tf_service = cli_modify_string_to_tf_service(args.modify_connection) + mc_args = args.modify_connection.split(";") + connection = Connection(from_tf_service=tf_service) + result = cm.update_connection(href, connection) + if result: + print(f"Updated {href} for {connection}") + else: + print(f"Failed to update {href} for {connection}") + + if args.show_connection_by_name: + connection = cm.get_connection_by_name(args.show_connection_by_name) + if connection: + print(str(connection)) + + if args.list_connections: + connections = cm.get_connections() + for c in connections: + print(str(c)) + + if args.delete_connection: + was_deleted = cm.delete_connection(args.delete_connection) + if was_deleted: + print(f"Successfully deleted {args.delete_connection}") + else: + print(f"Failed to delete {args.delete_connection}") + + if args.list_transport_capacities: + tcs = cm.get_transport_capacities() + for tc in tcs: + print(str(tc)) + + if args.create_transport_capacity: + tf_service = cli_create_string_to_tf_service(args.create_transport_capacity) + tc = TransportCapacity(from_tf_service=tf_service) + created_service = cm.create_transport_capacity(tc) + if created_service: + print(f"Created {created_service} for {tc}") + else: + print(f"Failed to create {tc}") + + if args.emulate_tf_set_config_service: + eargs = args.emulate_tf_set_config_service.split(";") + if len(eargs) < 5: + print("Mandatory tokens missing for --emulate-tf-set-config-service") + exit(-1) + + hub_module_name, uuid, input_sip, output_sip, capacity_value = eargs[0:5] + capacity_value = int(capacity_value) + config = { + "input_sip": input_sip, + "output_sip": output_sip, + "capacity_value": capacity_value, + "capacity_unit": "gigabit" + } + + constellation = cm.get_constellation_by_hub_name(hub_module_name) + + # Allow testing some of the VTI code before we have CM that has VTI + if len(eargs) > 5 and eargs[5] == "FORCE-VTI-ON": + constellation.traffic_mode = "VTIMode" + + if constellation is None: + print(f"Unable to find constellation for hub-module {hub_module_name}") + exit(-1) + result = tf.set_config_for_service(cm, constellation, uuid, config) + print(f"Emulated SetConfig() for service result: {result}") + if isinstance(result, Exception): + traceback.print_exception(result) + + if args.monitor_errors: + cm.print_received_errors = True + terminate.wait() + +finally: +# Delete subscriptions. It will end monitoring thread and ensure that program terminates normally + cm.stop_monitoring_errors() diff --git a/src/device/service/drivers/xr/cm/cm_connection.py b/src/device/service/drivers/xr/cm/cm_connection.py index 712849451..bcd62862d 100644 --- a/src/device/service/drivers/xr/cm/cm_connection.py +++ b/src/device/service/drivers/xr/cm/cm_connection.py @@ -15,9 +15,13 @@ from __future__ import annotations import collections.abc +import threading import logging import json import time +import asyncio +import websockets +import ssl from typing import Optional, List, Dict, Union import re import requests @@ -51,6 +55,55 @@ class ExpiringValue: class UnexpectedEmptyBody(Exception): pass +class ExternalError(Exception): + pass + +class ApiErrorFromIpm(Exception): + pass + +class ErrorFromIpm(ExternalError): + def __init__(self, err_dict): + msg = str(err_dict) + # Try to extract a short error message + try: + # Only look at first message + err_messages = err_dict["errors"]["errors"][0]["messages"] + for err_msg in err_messages: + if err_msg["lang"] == "en": + msg = err_msg["message"] + except KeyError: + pass + except IndexError: + pass + super().__init__(msg) + +class CreateConsistencyError(Exception): + pass + +class ErrorStore: + def __init__(self): + self.__lock = threading.Lock() + self.__db={} + self.__enabled=False + + def get_error(self, uri: str) -> Optional[dict]: + with self.__lock: + return self.__db.pop(uri, None) + + def set_error(self, uri: str, err_dict: dict): + with self.__lock: + if self.__enabled: + self.__db[uri] = err_dict + + def enable(self): + with self.__lock: + self.__enabled = True + + def disable(self): + with self.__lock: + self.__enabled = False + self.__db.clear() + # This is enum, not a regular class, see https://docs.python.org/3/library/enum.html # String based enums require python 3.11, so use nunber based and custom parser class ConsistencyMode(Enum): @@ -134,10 +187,25 @@ class HttpResult: return True + def raise_as_exception(self): + if self.exception is not None: + raise ExternalError(f"Failure for request {str(self)}") from self.exception + + status_code = self.status_code if self.status_code is not None else "" + + # Try to get error message from IPM + if self.json is not None and "errors" in self.json: + err_list = self.json["errors"] + if len(err_list) > 0 and "message" in err_list[0]: + err_msg = err_list[0]["message"] + raise ApiErrorFromIpm(f"{self.method} {self.url} {self.params}, status {status_code}, IPM reported error: {err_msg}") + + raise ExternalError(str(self)) + class CmConnection: CONSISTENCY_WAIT_LOG_INTERVAL = 1.0 - def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000) -> None: + def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000, monitor_error_stream: bool = True) -> None: self.__tls_verify = tls_verify if not tls_verify: urllib3.disable_warnings() @@ -151,7 +219,18 @@ class CmConnection: self.__username = username self.__password = password self.__cm_root = 'https://' + address + ':' + str(port) + self.__cm_ws_root = 'wss://' + address + ':' + str(port) self.__access_token = None + self.__monitor_error_stream = monitor_error_stream + self.__err_store=ErrorStore() + self.__err_monitor_thread = None + self.__err_monitor_connected = threading.Event() + self.__err_monitor_sub_id = None + self.__err_monitor_terminate = threading.Event() + self.print_received_errors = False + + def __del__(self): + self.stop_monitoring_errors() def __perform_request(self, http_result: HttpResult, permit_empty_body: bool, fn, *args, **kwargs): try: @@ -238,7 +317,121 @@ class CmConnection: self.__acquire_access_token() def Connect(self) -> bool: - return self.__acquire_access_token() + if not self.__acquire_access_token(): + return False + return self.monitor_errors() if self.__monitor_error_stream else True + + def subscribe_errors(self): + sub = [ + { + "subscriptionName": "TfXrDriverErrorMonitopr", + "subscriptionFilters": [ + { + "requestedNotificationTypes": [ "Error" ], + "requestedResources": [ + { + "resourceType": "cm.network-connection", + } + ] + }, + ] + } + ] + + r = self.__post("/api/v1/subscriptions/events", sub) + #print(r.status_code, r.text) + if not r.is_valid_json_list_with_status(201) or len(r.json) != 1: + return None, None + try: + return self.__cm_ws_root + r.json[0]["notificationChannel"]["streamAddress"], r.json[0]["subscriptionId"] + except KeyError: + return None, None + + def unsubscribe(self, sub_id: str): + resp = self.__delete(f"/api/v1/subscriptions/events/{sub_id}") + if resp.is_valid_with_status_ignore_body(202): + LOGGER.info(f"Deleted subscription {sub_id=}") + return True + else: + LOGGER.info(f"Deleting subscription {sub_id=} failed, status {resp.status_code}") + return False + + def monitor_errors(self) -> bool: + uri, sub_id = self.subscribe_errors() + if not uri or not sub_id: + return False + self.__err_monitor_sub_id = sub_id + + def err_monitor_thread(): + LOGGER.info(f"Listening errors via {uri}") + + ctx = ssl.create_default_context() + if not self.__tls_verify: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + async def receive_websock(uri, ssl_ctx): + while not self.__err_monitor_terminate.is_set(): + try: + async with websockets.connect(uri, ssl=ssl_ctx) as websocket: + LOGGER.info(f"err_monitor_thread(): WebSock connected to {uri}") + self.__err_monitor_connected.set() + while not self.__err_monitor_terminate.is_set(): + # 5s timeout is used for forced checking of terminate flag + # In normal termmination timeout is not experienced, as + # we unsubscribe and that will trigger server to close the + # connection. This timeout exists purely as backup + # in case unsubscribe fails + try: + msg = await asyncio.wait_for(websocket.recv(), timeout=5.0) + except asyncio.exceptions.TimeoutError: + continue + if self.print_received_errors: + print(f"RX: {msg}") + try: + msg_json = json.loads(msg) + href = msg_json["href"] + LOGGER.debug(f"err_monitor_thread(): RX [{href}]: {msg}") + self.__err_store.set_error(href, msg_json) + except json.JSONDecodeError as json_err: + LOGGER.error(f"err_monitor_thread(): Invalid message received: {msg}, JSON decode error {str(json_err)}") + except KeyError: + LOGGER.error(f"err_monitor_thread(): Missing href in message: {msg}") + except asyncio.CancelledError as e: + LOGGER.debug("err_monitor_thread(): monitoring cancelled") + raise e + except Exception as e: + if not self.__err_monitor_terminate.is_set(): + LOGGER.error(f"err_monitor_thread(): exception {str(e)}, reconnecting") + time.sleep(1) + + asyncio.run(receive_websock(uri, ctx)) + LOGGER.debug("err_monitor_thread(): thread terminating") + + assert self.__err_monitor_thread is None + self.__err_monitor_terminate.clear() + self.__err_monitor_thread = threading.Thread(target=err_monitor_thread) + self.__err_monitor_thread.start() + # If we can get connection soon, wait for it, otherwise proceed without delay + # Not waiting for connection may miss some errors (-->timeout later), waiting too long + # makes for bad experience + self.__err_monitor_connected.wait(0.5) + + return True + + def stop_monitoring_errors(self): + self.__err_monitor_terminate.set() + + if self.__err_monitor_sub_id: + LOGGER.debug(f"Disabling error subscribtion {self.__err_monitor_sub_id }") + self.unsubscribe(self.__err_monitor_sub_id) + self.__err_monitor_sub_id = None + + if self.__err_monitor_thread is not None: + LOGGER.debug("Terminating error monitoring thread") + self.__err_monitor_thread.join() + LOGGER.info("Error monitoring thread terminated") + self.__err_monitor_thread = None def list_constellations(self) -> List[Constellation]: r = self.__get("/api/v1/xr-networks?content=expanded") @@ -246,7 +439,6 @@ class CmConnection: return [] return [Constellation(c) for c in r.json] - def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]: qparams = [ ('content', 'expanded'), @@ -324,6 +516,11 @@ class CmConnection: log_ts = ts LOGGER.info(f"apply_create_consistency(): waiting for life cycle state progress for {get_result}, current: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds") else: + err_info = self.__err_store.get_error(obj.href) + if err_info is not None: + LOGGER.info(f"apply_create_consistency(): asynchronous error reported for {obj}: {str(err_info)}") + raise ErrorFromIpm(err_info) + ts = time.perf_counter() if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL: log_ts = ts @@ -337,10 +534,13 @@ class CmConnection: duration = time.perf_counter() - ts_start if not valid: if get_result: - LOGGER.info(f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds") + msg = f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds" + LOGGER.info(msg) + raise CreateConsistencyError(msg) else: - LOGGER.info(f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds") - return None + msg = f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds" + LOGGER.info(msg) + raise CreateConsistencyError(msg) else: LOGGER.info(f"Applied create consistency for {get_result}, final life-cycle-state {str(get_result.life_cycle_info)}, duration {duration} seconds") @@ -399,20 +599,24 @@ class CmConnection: # Create wants a list, so wrap connection to list cfg = [connection.create_config()] - resp = self.__post("/api/v1/network-connections", cfg) - if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: - connection.href = resp.json[0]["href"] - LOGGER.info(f"IPM accepted create request for connection {connection}") - new_connection = self.apply_create_consistency(connection, lambda: self.get_connection_by_href(connection.href)) - if new_connection: - LOGGER.info(f"Created connection {new_connection}") - return new_connection.href + self.__err_store.enable() + try: + resp = self.__post("/api/v1/network-connections", cfg) + if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: + connection.href = resp.json[0]["href"] + LOGGER.info(f"IPM accepted create request for connection {connection}") + new_connection = self.apply_create_consistency(connection, lambda: self.get_connection_by_href(connection.href)) + if new_connection: + LOGGER.info(f"Created connection {new_connection}") + return new_connection.href + else: + LOGGER.error(f"Consistency failure for connection {connection}, result {resp}") + return None else: - LOGGER.error(f"Consistency failure for connection {connection}, result {resp}") - return None - else: - LOGGER.error(f"Create failure for connection {connection}, result {resp}") - return None + LOGGER.error(f"Create failure for connection {connection}, result {resp}") + resp.raise_as_exception() + finally: + self.__err_store.disable() def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]: cfg = connection.create_config() diff --git a/src/device/service/drivers/xr/cm/tests/test_cm_connection.py b/src/device/service/drivers/xr/cm/tests/test_cm_connection.py index a7944ed22..22b74f36a 100644 --- a/src/device/service/drivers/xr/cm/tests/test_cm_connection.py +++ b/src/device/service/drivers/xr/cm/tests/test_cm_connection.py @@ -37,30 +37,30 @@ def test_cmc_connect(): # Valid access token with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=access_token) - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() # Valid JSON but no access token with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=r'{"a": "b"}') - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() # Invalid JSON with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=r'}}}') - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() with requests_mock.Mocker() as m: # No mock present for the destination - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() def test_cmc_get_constellations(): with mock_cm_connectivity() as m: m.get("https://127.0.0.1:9999/api/v1/xr-networks?content=expanded", text=res_constellations) - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() # List all constellations diff --git a/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py b/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py index e9b16b620..bea9b4c98 100644 --- a/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py +++ b/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py @@ -20,7 +20,7 @@ import traceback import copy import requests_mock -from ..cm_connection import CmConnection, ConsistencyMode +from ..cm_connection import CmConnection, ConsistencyMode, CreateConsistencyError from ..tf import set_config_for_service access_token = r'{"access_token":"eyI3...","expires_in":3600,"refresh_expires_in":0,"refresh_token":"ey...","token_type":"Bearer","not-before-policy":0,"session_state":"f6e235c4-4ca4-4258-bede-4f2b7125adfb","scope":"profile email offline_access"}' @@ -51,13 +51,16 @@ config = { } def _validate_result(result, expect): - if isinstance(result, Exception): - traceback.print_exception(result) - assert result is expect # Not, "is", not ==, we want type checking in this case, as also an exception can be returned (as return value) + if isinstance(expect, Exception): + assert type(result) == type(expect) + else: + if isinstance(result, Exception): + traceback.print_exception(result) + assert result is expect # Not, "is", not ==, we want type checking in this case, as also an exception can be returned (as return value) def test_xr_set_config(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") @@ -86,7 +89,7 @@ def repeat_last_expected(expected: list[tuple], called: list[tuple]) -> list[tup def test_xr_set_config_consistency_lifecycle(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.lifecycle, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.lifecycle, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") @@ -125,7 +128,7 @@ def test_xr_set_config_consistency_lifecycle(): { 'json': json_non_terminal, 'status_code': 200 }]) result = set_config_for_service(cm, constellation, uuid, config) - _validate_result(result, False) # Service creation failure due to insufficient progress + _validate_result(result, CreateConsistencyError("")) # Service creation failure due to insufficient progress called_mocks = [(r._request.method, r._request.url) for r in m._adapter.request_history] expected_mocks_no_connect = [ @@ -139,7 +142,7 @@ def test_xr_set_config_consistency_lifecycle(): ################################################################################ # Same as before, but CmConnection no longer requiring lifcycle progress m.reset_mock() - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") assert constellation @@ -154,21 +157,21 @@ def test_xr_set_config_consistency_lifecycle(): ################################################################################ # Same as above, but without REST object appearing m.reset_mock() - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") assert constellation m.get("https://127.0.0.1:9999/api/v1/network-connections/c3b31608-0bb7-4a4f-9f9a-88b24a059432", [{'text': '', 'status_code': 401}]) result = set_config_for_service(cm, constellation, uuid, config) - _validate_result(result, False) + _validate_result(result, CreateConsistencyError("")) called_mocks = [(r._request.method, r._request.url) for r in m._adapter.request_history] assert called_mocks == repeat_last_expected(expected_mocks[:2] + expected_mocks_no_connect, called_mocks) def test_xr_set_config_update_case(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") diff --git a/src/device/service/drivers/xr/cm/tf.py b/src/device/service/drivers/xr/cm/tf.py index c44cb0c9f..0177346a0 100644 --- a/src/device/service/drivers/xr/cm/tf.py +++ b/src/device/service/drivers/xr/cm/tf.py @@ -15,7 +15,7 @@ from typing import Dict, Union import logging -from .cm_connection import CmConnection +from .cm_connection import CmConnection, ExternalError from .constellation import Constellation from .tf_service import TFService from .transport_capacity import TransportCapacity @@ -57,13 +57,17 @@ def set_config_for_service(cm_connection: CmConnection, constellation: Constella LOGGER.error(f"set_config_for_service: Failed to create Transport Capacity ({desired_tc=})") return False connection = Connection(from_tf_service=service) - href = cm_connection.create_or_update_connection(connection) - if href: - LOGGER.info(f"set_config_for_service: Created service {uuid} as {href} (connection={str(connection)})") - return True - else: - LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)})") - return False + try: + href = cm_connection.create_or_update_connection(connection) + if href: + LOGGER.info(f"set_config_for_service: Created service {uuid} as {href} (connection={str(connection)})") + return True + else: + LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)})") + return False + except ExternalError as e: + LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)}): {str(e)}") + return e # Intentionally catching all exceptions, as they are stored in a list as return values # by the caller # pylint: disable=broad-except -- GitLab