diff --git a/src/device/requirements.in b/src/device/requirements.in index ec29fc7a30278625e950f3eed608281f8c7c5cb8..50b941160937aa09976dd3dda4afab6c69d309bb 100644 --- a/src/device/requirements.in +++ b/src/device/requirements.in @@ -29,6 +29,7 @@ xmltodict==0.12.0 tabulate ipaddress macaddress +websockets==10.4 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/device/service/drivers/xr/README_XR.md b/src/device/service/drivers/xr/README_XR.md index fa1bc944035d27769cd9c16e0c29318e554e9489..9c64cdef1b773e84153c0d27a58e71af8bdf238f 100644 --- a/src/device/service/drivers/xr/README_XR.md +++ b/src/device/service/drivers/xr/README_XR.md @@ -107,7 +107,7 @@ This will make imports to work properly in all cases. Run deploy script to build in docker containers and then instantiate to configured K8s cluster. Deploy script must be sources for this to work! ```bash -./deploy.sh +./deploy/all.sh ``` If protobuf definitions have changed, regenerate version controlled Java files manually diff --git a/src/device/service/drivers/xr/XrDriver.py b/src/device/service/drivers/xr/XrDriver.py index 2b53de8e71f84aada460f444a3663238290f6ea5..c1471a8136b0e5cd7791e019bb0bdafd2252f591 100644 --- a/src/device/service/drivers/xr/XrDriver.py +++ b/src/device/service/drivers/xr/XrDriver.py @@ -48,7 +48,7 @@ class XrDriver(_Driver): tls_verify = False # Currently using self signed certificates username = settings.get("username", "xr-user-1") password = settings.get("password", "xr-user-1") - + # Options are: # disabled --> just import endpoints as usual # devices --> imports sub-devices but not links connecting them. @@ -88,6 +88,7 @@ class XrDriver(_Driver): def Disconnect(self) -> bool: LOGGER.info(f"Disconnect[{self}]") with self.__lock: + self.__cm_connection.stop_monitoring_errors() self.__terminate.set() return True diff --git a/src/device/service/drivers/xr/cm-cli.py b/src/device/service/drivers/xr/cm-cli.py index 14c6d24b6da05a6f506152d0099d4739c2858e2c..9aefe969c0549819568882a6215ae3dd86b7df3b 100755 --- a/src/device/service/drivers/xr/cm-cli.py +++ b/src/device/service/drivers/xr/cm-cli.py @@ -16,16 +16,22 @@ # Test program for CmConnection import argparse +import signal import logging import traceback +import threading from typing import Tuple -from cm.cm_connection import CmConnection, ConsistencyMode +from cm.cm_connection import CmConnection, ConsistencyMode, ErrorFromIpm from cm.tf_service import TFService from cm.transport_capacity import TransportCapacity from cm.connection import Connection import cm.tf as tf +import asyncio +import websockets +import ssl +import time -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.WARNING) parser = argparse.ArgumentParser(description='CM Connectin Test Utility') parser.add_argument('ip', help='CM IP address or domain name') @@ -33,6 +39,7 @@ parser.add_argument('port', help='CM port', type=int) parser.add_argument('username', help='Username') parser.add_argument('password', help='Password') +parser.add_argument('--monitor-errors', action='store_true') parser.add_argument('--list-constellations', action='store_true') parser.add_argument('--show-constellation-by-hub-name', nargs='?', type=str) parser.add_argument('--create-connection', nargs='?', type=str, help="uuid;ifname;ifname;capacity") @@ -84,98 +91,118 @@ else: retry_interval = 0.2 cm = CmConnection(args.ip, args.port, args.username, args.password, timeout=args.timeout, tls_verify=False, consistency_mode=consistency_mode, retry_interval=retry_interval) -if not cm.Connect(): - exit(-1) - -if args.list_constellations: - constellations = cm.list_constellations() - for constellation in constellations: - print("Constellation:", constellation.constellation_id) - for if_name in constellation.ifnames(): - print(f" {if_name}") - -if args.show_constellation_by_hub_name: - constellation = cm.get_constellation_by_hub_name(args.show_constellation_by_hub_name) - if constellation: - print(f"Constellation: {constellation.constellation_id}, traffic-mode: {constellation.traffic_mode}") - for if_name in constellation.ifnames(): - print(f" {if_name}") - -if args.create_connection: - tf_service = cli_create_string_to_tf_service(args.create_connection) - connection = Connection(from_tf_service=tf_service) - created_service = cm.create_connection(connection) - if created_service: - print(f"Created {created_service} for {connection}") - else: - print(f"Failed to create {connection}") - -if args.modify_connection: - href, tf_service = cli_modify_string_to_tf_service(args.modify_connection) - mc_args = args.modify_connection.split(";") - connection = Connection(from_tf_service=tf_service) - result = cm.update_connection(href, connection) - if result: - print(f"Updated {href} for {connection}") - else: - print(f"Failed to update {href} for {connection}") - -if args.show_connection_by_name: - connection = cm.get_connection_by_name(args.show_connection_by_name) - if connection: - print(str(connection)) - -if args.list_connections: - connections = cm.get_connections() - for c in connections: - print(str(c)) - -if args.delete_connection: - was_deleted = cm.delete_connection(args.delete_connection) - if was_deleted: - print(f"Successfully deleted {args.delete_connection}") - else: - print(f"Failed to delete {args.delete_connection}") - -if args.list_transport_capacities: - tcs = cm.get_transport_capacities() - for tc in tcs: - print(str(tc)) - -if args.create_transport_capacity: - tf_service = cli_create_string_to_tf_service(args.create_transport_capacity) - tc = TransportCapacity(from_tf_service=tf_service) - created_service = cm.create_transport_capacity(tc) - if created_service: - print(f"Created {created_service} for {tc}") - else: - print(f"Failed to create {tc}") - -if args.emulate_tf_set_config_service: - eargs = args.emulate_tf_set_config_service.split(";") - if len(eargs) < 5: - print("Mandatory tokens missing for --emulate-tf-set-config-service") - exit(-1) - hub_module_name, uuid, input_sip, output_sip, capacity_value = eargs[0:5] - capacity_value = int(capacity_value) - config = { - "input_sip_name": input_sip, - "output_sip_name": output_sip, - "capacity_value": capacity_value, - "capacity_unit": "gigabit" - } +terminate = threading.Event() +def signal_handler(sig, frame): + cm.stop_monitoring_errors() + terminate.set() - constellation = cm.get_constellation_by_hub_name(hub_module_name) +signal.signal(signal.SIGINT, signal_handler) - # Allow testing some of the VTI code before we have CM that has VTI - if len(eargs) > 5 and eargs[5] == "FORCE-VTI-ON": - constellation.traffic_mode = "VTIMode" - - if constellation is None: - print(f"Unable to find constellation for hub-module {hub_module_name}") +try: + if not cm.Connect(): exit(-1) - result = tf.set_config_for_service(cm, constellation, uuid, config) - print(f"Emulated SetConfig() for service result: {result}") - if isinstance(result, Exception): - traceback.print_exception(result) + + if args.list_constellations: + constellations = cm.list_constellations() + for constellation in constellations: + print("Constellation:", constellation.constellation_id) + for if_name in constellation.ifnames(): + print(f" {if_name}") + + if args.show_constellation_by_hub_name: + constellation = cm.get_constellation_by_hub_name(args.show_constellation_by_hub_name) + if constellation: + print(f"Constellation: {constellation.constellation_id}, traffic-mode: {constellation.traffic_mode}") + for if_name in constellation.ifnames(): + print(f" {if_name}") + + if args.create_connection: + tf_service = cli_create_string_to_tf_service(args.create_connection) + connection = Connection(from_tf_service=tf_service) + try: + created_service = cm.create_connection(connection) + if created_service: + print(f"Created {created_service} for {connection}") + else: + print(f"Failed to create {connection}") + except ErrorFromIpm as ipm_err: + print(f"Failed to create {connection}: {str(ipm_err)}") + + if args.modify_connection: + href, tf_service = cli_modify_string_to_tf_service(args.modify_connection) + mc_args = args.modify_connection.split(";") + connection = Connection(from_tf_service=tf_service) + result = cm.update_connection(href, connection) + if result: + print(f"Updated {href} for {connection}") + else: + print(f"Failed to update {href} for {connection}") + + if args.show_connection_by_name: + connection = cm.get_connection_by_name(args.show_connection_by_name) + if connection: + print(str(connection)) + + if args.list_connections: + connections = cm.get_connections() + for c in connections: + print(str(c)) + + if args.delete_connection: + was_deleted = cm.delete_connection(args.delete_connection) + if was_deleted: + print(f"Successfully deleted {args.delete_connection}") + else: + print(f"Failed to delete {args.delete_connection}") + + if args.list_transport_capacities: + tcs = cm.get_transport_capacities() + for tc in tcs: + print(str(tc)) + + if args.create_transport_capacity: + tf_service = cli_create_string_to_tf_service(args.create_transport_capacity) + tc = TransportCapacity(from_tf_service=tf_service) + created_service = cm.create_transport_capacity(tc) + if created_service: + print(f"Created {created_service} for {tc}") + else: + print(f"Failed to create {tc}") + + if args.emulate_tf_set_config_service: + eargs = args.emulate_tf_set_config_service.split(";") + if len(eargs) < 5: + print("Mandatory tokens missing for --emulate-tf-set-config-service") + exit(-1) + + hub_module_name, uuid, input_sip, output_sip, capacity_value = eargs[0:5] + capacity_value = int(capacity_value) + config = { + "input_sip_name": input_sip, + "output_sip_name": output_sip, + "capacity_value": capacity_value, + "capacity_unit": "gigabit" + } + + constellation = cm.get_constellation_by_hub_name(hub_module_name) + + # Allow testing some of the VTI code before we have CM that has VTI + if len(eargs) > 5 and eargs[5] == "FORCE-VTI-ON": + constellation.traffic_mode = "VTIMode" + + if constellation is None: + print(f"Unable to find constellation for hub-module {hub_module_name}") + exit(-1) + result = tf.set_config_for_service(cm, constellation, uuid, config) + print(f"Emulated SetConfig() for service result: {result}") + if isinstance(result, Exception): + traceback.print_exception(result) + + if args.monitor_errors: + cm.print_received_errors = True + terminate.wait() + +finally: +# Delete subscriptions. It will end monitoring thread and ensure that program terminates normally + cm.stop_monitoring_errors() diff --git a/src/device/service/drivers/xr/cm/cm_connection.py b/src/device/service/drivers/xr/cm/cm_connection.py index 7128494510f40914917d2c3981158b6dd3571c70..bcd62862de82f115c7c1ef7e98039e6398e62891 100644 --- a/src/device/service/drivers/xr/cm/cm_connection.py +++ b/src/device/service/drivers/xr/cm/cm_connection.py @@ -15,9 +15,13 @@ from __future__ import annotations import collections.abc +import threading import logging import json import time +import asyncio +import websockets +import ssl from typing import Optional, List, Dict, Union import re import requests @@ -51,6 +55,55 @@ class ExpiringValue: class UnexpectedEmptyBody(Exception): pass +class ExternalError(Exception): + pass + +class ApiErrorFromIpm(Exception): + pass + +class ErrorFromIpm(ExternalError): + def __init__(self, err_dict): + msg = str(err_dict) + # Try to extract a short error message + try: + # Only look at first message + err_messages = err_dict["errors"]["errors"][0]["messages"] + for err_msg in err_messages: + if err_msg["lang"] == "en": + msg = err_msg["message"] + except KeyError: + pass + except IndexError: + pass + super().__init__(msg) + +class CreateConsistencyError(Exception): + pass + +class ErrorStore: + def __init__(self): + self.__lock = threading.Lock() + self.__db={} + self.__enabled=False + + def get_error(self, uri: str) -> Optional[dict]: + with self.__lock: + return self.__db.pop(uri, None) + + def set_error(self, uri: str, err_dict: dict): + with self.__lock: + if self.__enabled: + self.__db[uri] = err_dict + + def enable(self): + with self.__lock: + self.__enabled = True + + def disable(self): + with self.__lock: + self.__enabled = False + self.__db.clear() + # This is enum, not a regular class, see https://docs.python.org/3/library/enum.html # String based enums require python 3.11, so use nunber based and custom parser class ConsistencyMode(Enum): @@ -134,10 +187,25 @@ class HttpResult: return True + def raise_as_exception(self): + if self.exception is not None: + raise ExternalError(f"Failure for request {str(self)}") from self.exception + + status_code = self.status_code if self.status_code is not None else "" + + # Try to get error message from IPM + if self.json is not None and "errors" in self.json: + err_list = self.json["errors"] + if len(err_list) > 0 and "message" in err_list[0]: + err_msg = err_list[0]["message"] + raise ApiErrorFromIpm(f"{self.method} {self.url} {self.params}, status {status_code}, IPM reported error: {err_msg}") + + raise ExternalError(str(self)) + class CmConnection: CONSISTENCY_WAIT_LOG_INTERVAL = 1.0 - def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000) -> None: + def __init__(self, address: str, port: int, username: str, password: str, timeout=30, tls_verify=True, consistency_mode: ConsistencyMode = ConsistencyMode.asynchronous, retry_interval: float=0.2, max_consistency_tries:int = 100_000, monitor_error_stream: bool = True) -> None: self.__tls_verify = tls_verify if not tls_verify: urllib3.disable_warnings() @@ -151,7 +219,18 @@ class CmConnection: self.__username = username self.__password = password self.__cm_root = 'https://' + address + ':' + str(port) + self.__cm_ws_root = 'wss://' + address + ':' + str(port) self.__access_token = None + self.__monitor_error_stream = monitor_error_stream + self.__err_store=ErrorStore() + self.__err_monitor_thread = None + self.__err_monitor_connected = threading.Event() + self.__err_monitor_sub_id = None + self.__err_monitor_terminate = threading.Event() + self.print_received_errors = False + + def __del__(self): + self.stop_monitoring_errors() def __perform_request(self, http_result: HttpResult, permit_empty_body: bool, fn, *args, **kwargs): try: @@ -238,7 +317,121 @@ class CmConnection: self.__acquire_access_token() def Connect(self) -> bool: - return self.__acquire_access_token() + if not self.__acquire_access_token(): + return False + return self.monitor_errors() if self.__monitor_error_stream else True + + def subscribe_errors(self): + sub = [ + { + "subscriptionName": "TfXrDriverErrorMonitopr", + "subscriptionFilters": [ + { + "requestedNotificationTypes": [ "Error" ], + "requestedResources": [ + { + "resourceType": "cm.network-connection", + } + ] + }, + ] + } + ] + + r = self.__post("/api/v1/subscriptions/events", sub) + #print(r.status_code, r.text) + if not r.is_valid_json_list_with_status(201) or len(r.json) != 1: + return None, None + try: + return self.__cm_ws_root + r.json[0]["notificationChannel"]["streamAddress"], r.json[0]["subscriptionId"] + except KeyError: + return None, None + + def unsubscribe(self, sub_id: str): + resp = self.__delete(f"/api/v1/subscriptions/events/{sub_id}") + if resp.is_valid_with_status_ignore_body(202): + LOGGER.info(f"Deleted subscription {sub_id=}") + return True + else: + LOGGER.info(f"Deleting subscription {sub_id=} failed, status {resp.status_code}") + return False + + def monitor_errors(self) -> bool: + uri, sub_id = self.subscribe_errors() + if not uri or not sub_id: + return False + self.__err_monitor_sub_id = sub_id + + def err_monitor_thread(): + LOGGER.info(f"Listening errors via {uri}") + + ctx = ssl.create_default_context() + if not self.__tls_verify: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + async def receive_websock(uri, ssl_ctx): + while not self.__err_monitor_terminate.is_set(): + try: + async with websockets.connect(uri, ssl=ssl_ctx) as websocket: + LOGGER.info(f"err_monitor_thread(): WebSock connected to {uri}") + self.__err_monitor_connected.set() + while not self.__err_monitor_terminate.is_set(): + # 5s timeout is used for forced checking of terminate flag + # In normal termmination timeout is not experienced, as + # we unsubscribe and that will trigger server to close the + # connection. This timeout exists purely as backup + # in case unsubscribe fails + try: + msg = await asyncio.wait_for(websocket.recv(), timeout=5.0) + except asyncio.exceptions.TimeoutError: + continue + if self.print_received_errors: + print(f"RX: {msg}") + try: + msg_json = json.loads(msg) + href = msg_json["href"] + LOGGER.debug(f"err_monitor_thread(): RX [{href}]: {msg}") + self.__err_store.set_error(href, msg_json) + except json.JSONDecodeError as json_err: + LOGGER.error(f"err_monitor_thread(): Invalid message received: {msg}, JSON decode error {str(json_err)}") + except KeyError: + LOGGER.error(f"err_monitor_thread(): Missing href in message: {msg}") + except asyncio.CancelledError as e: + LOGGER.debug("err_monitor_thread(): monitoring cancelled") + raise e + except Exception as e: + if not self.__err_monitor_terminate.is_set(): + LOGGER.error(f"err_monitor_thread(): exception {str(e)}, reconnecting") + time.sleep(1) + + asyncio.run(receive_websock(uri, ctx)) + LOGGER.debug("err_monitor_thread(): thread terminating") + + assert self.__err_monitor_thread is None + self.__err_monitor_terminate.clear() + self.__err_monitor_thread = threading.Thread(target=err_monitor_thread) + self.__err_monitor_thread.start() + # If we can get connection soon, wait for it, otherwise proceed without delay + # Not waiting for connection may miss some errors (-->timeout later), waiting too long + # makes for bad experience + self.__err_monitor_connected.wait(0.5) + + return True + + def stop_monitoring_errors(self): + self.__err_monitor_terminate.set() + + if self.__err_monitor_sub_id: + LOGGER.debug(f"Disabling error subscribtion {self.__err_monitor_sub_id }") + self.unsubscribe(self.__err_monitor_sub_id) + self.__err_monitor_sub_id = None + + if self.__err_monitor_thread is not None: + LOGGER.debug("Terminating error monitoring thread") + self.__err_monitor_thread.join() + LOGGER.info("Error monitoring thread terminated") + self.__err_monitor_thread = None def list_constellations(self) -> List[Constellation]: r = self.__get("/api/v1/xr-networks?content=expanded") @@ -246,7 +439,6 @@ class CmConnection: return [] return [Constellation(c) for c in r.json] - def get_constellation_by_hub_name(self, hub_module_name: str) -> Optional[Constellation]: qparams = [ ('content', 'expanded'), @@ -324,6 +516,11 @@ class CmConnection: log_ts = ts LOGGER.info(f"apply_create_consistency(): waiting for life cycle state progress for {get_result}, current: {str(get_result.life_cycle_info)}, ellapsed time {ts-ts_start} seconds") else: + err_info = self.__err_store.get_error(obj.href) + if err_info is not None: + LOGGER.info(f"apply_create_consistency(): asynchronous error reported for {obj}: {str(err_info)}") + raise ErrorFromIpm(err_info) + ts = time.perf_counter() if ts - log_ts >= self.CONSISTENCY_WAIT_LOG_INTERVAL: log_ts = ts @@ -337,10 +534,13 @@ class CmConnection: duration = time.perf_counter() - ts_start if not valid: if get_result: - LOGGER.info(f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds") + msg = f"Failed to apply create consistency for {get_result}, insufficient life-cycle-state progress ({str(get_result.life_cycle_info)}), duration {duration} seconds" + LOGGER.info(msg) + raise CreateConsistencyError(msg) else: - LOGGER.info(f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds") - return None + msg = f"Failed to apply create consistency for {obj}, REST object did not appear, duration {duration} seconds" + LOGGER.info(msg) + raise CreateConsistencyError(msg) else: LOGGER.info(f"Applied create consistency for {get_result}, final life-cycle-state {str(get_result.life_cycle_info)}, duration {duration} seconds") @@ -399,20 +599,24 @@ class CmConnection: # Create wants a list, so wrap connection to list cfg = [connection.create_config()] - resp = self.__post("/api/v1/network-connections", cfg) - if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: - connection.href = resp.json[0]["href"] - LOGGER.info(f"IPM accepted create request for connection {connection}") - new_connection = self.apply_create_consistency(connection, lambda: self.get_connection_by_href(connection.href)) - if new_connection: - LOGGER.info(f"Created connection {new_connection}") - return new_connection.href + self.__err_store.enable() + try: + resp = self.__post("/api/v1/network-connections", cfg) + if resp.is_valid_json_list_with_status(202, 1, 1) and "href" in resp.json[0]: + connection.href = resp.json[0]["href"] + LOGGER.info(f"IPM accepted create request for connection {connection}") + new_connection = self.apply_create_consistency(connection, lambda: self.get_connection_by_href(connection.href)) + if new_connection: + LOGGER.info(f"Created connection {new_connection}") + return new_connection.href + else: + LOGGER.error(f"Consistency failure for connection {connection}, result {resp}") + return None else: - LOGGER.error(f"Consistency failure for connection {connection}, result {resp}") - return None - else: - LOGGER.error(f"Create failure for connection {connection}, result {resp}") - return None + LOGGER.error(f"Create failure for connection {connection}, result {resp}") + resp.raise_as_exception() + finally: + self.__err_store.disable() def update_connection(self, href: str, connection: Connection, existing_connection: Optional[Connection]=None) -> Optional[str]: cfg = connection.create_config() diff --git a/src/device/service/drivers/xr/cm/tests/test_cm_connection.py b/src/device/service/drivers/xr/cm/tests/test_cm_connection.py index a7944ed220c6d68aad2f122a0bb0d2c1f83fdd06..22b74f36a60e3eda4a0d08d9791cae112b7fd605 100644 --- a/src/device/service/drivers/xr/cm/tests/test_cm_connection.py +++ b/src/device/service/drivers/xr/cm/tests/test_cm_connection.py @@ -37,30 +37,30 @@ def test_cmc_connect(): # Valid access token with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=access_token) - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() # Valid JSON but no access token with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=r'{"a": "b"}') - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() # Invalid JSON with requests_mock.Mocker() as m: m.post('https://127.0.0.1:9999/realms/xr-cm/protocol/openid-connect/token', text=r'}}}') - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() with requests_mock.Mocker() as m: # No mock present for the destination - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert not cm.Connect() def test_cmc_get_constellations(): with mock_cm_connectivity() as m: m.get("https://127.0.0.1:9999/api/v1/xr-networks?content=expanded", text=res_constellations) - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() # List all constellations diff --git a/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py b/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py index 05f55d0df471115ea2e5ace7c3014333aa94e0fe..42785caad79f4ba6000877e81d3caf403c463c1a 100644 --- a/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py +++ b/src/device/service/drivers/xr/cm/tests/test_xr_service_set_config.py @@ -20,7 +20,7 @@ import traceback import copy import requests_mock -from ..cm_connection import CmConnection, ConsistencyMode +from ..cm_connection import CmConnection, ConsistencyMode, CreateConsistencyError from ..tf import set_config_for_service access_token = r'{"access_token":"eyI3...","expires_in":3600,"refresh_expires_in":0,"refresh_token":"ey...","token_type":"Bearer","not-before-policy":0,"session_state":"f6e235c4-4ca4-4258-bede-4f2b7125adfb","scope":"profile email offline_access"}' @@ -51,13 +51,16 @@ config = { } def _validate_result(result, expect): - if isinstance(result, Exception): - traceback.print_exception(result) - assert result is expect # Not, "is", not ==, we want type checking in this case, as also an exception can be returned (as return value) + if isinstance(expect, Exception): + assert type(result) == type(expect) + else: + if isinstance(result, Exception): + traceback.print_exception(result) + assert result is expect # Not, "is", not ==, we want type checking in this case, as also an exception can be returned (as return value) def test_xr_set_config(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") @@ -86,7 +89,7 @@ def repeat_last_expected(expected: list[tuple], called: list[tuple]) -> list[tup def test_xr_set_config_consistency_lifecycle(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.lifecycle, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.lifecycle, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") @@ -125,7 +128,7 @@ def test_xr_set_config_consistency_lifecycle(): { 'json': json_non_terminal, 'status_code': 200 }]) result = set_config_for_service(cm, constellation, uuid, config) - _validate_result(result, False) # Service creation failure due to insufficient progress + _validate_result(result, CreateConsistencyError("")) # Service creation failure due to insufficient progress called_mocks = [(r._request.method, r._request.url) for r in m._adapter.request_history] expected_mocks_no_connect = [ @@ -139,7 +142,7 @@ def test_xr_set_config_consistency_lifecycle(): ################################################################################ # Same as before, but CmConnection no longer requiring lifcycle progress m.reset_mock() - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") assert constellation @@ -154,21 +157,21 @@ def test_xr_set_config_consistency_lifecycle(): ################################################################################ # Same as above, but without REST object appearing m.reset_mock() - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, consistency_mode=ConsistencyMode.synchronous, retry_interval=0, timeout=1, max_consistency_tries=3, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") assert constellation m.get("https://127.0.0.1:9999/api/v1/network-connections/c3b31608-0bb7-4a4f-9f9a-88b24a059432", [{'text': '', 'status_code': 401}]) result = set_config_for_service(cm, constellation, uuid, config) - _validate_result(result, False) + _validate_result(result, CreateConsistencyError("")) called_mocks = [(r._request.method, r._request.url) for r in m._adapter.request_history] assert called_mocks == repeat_last_expected(expected_mocks[:2] + expected_mocks_no_connect, called_mocks) def test_xr_set_config_update_case(): with mock_cm() as m: - cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False) + cm = CmConnection("127.0.0.1", 9999, "xr-user", "xr-password", tls_verify=False, monitor_error_stream=False) assert cm.Connect() constellation = cm.get_constellation_by_hub_name("XR HUB 1") diff --git a/src/device/service/drivers/xr/cm/tf.py b/src/device/service/drivers/xr/cm/tf.py index ace3cd2888809e2b498a82b1ade9983419033c20..4b1352216d79aea46beca8c5383f64f39869f91b 100644 --- a/src/device/service/drivers/xr/cm/tf.py +++ b/src/device/service/drivers/xr/cm/tf.py @@ -15,7 +15,7 @@ from typing import Dict, Union import logging -from .cm_connection import CmConnection +from .cm_connection import CmConnection, ExternalError from .constellation import Constellation from .tf_service import TFService from .transport_capacity import TransportCapacity @@ -57,13 +57,17 @@ def set_config_for_service(cm_connection: CmConnection, constellation: Constella LOGGER.error(f"set_config_for_service: Failed to create Transport Capacity ({desired_tc=})") return False connection = Connection(from_tf_service=service) - href = cm_connection.create_or_update_connection(connection) - if href: - LOGGER.info(f"set_config_for_service: Created service {uuid} as {href} (connection={str(connection)})") - return True - else: - LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)})") - return False + try: + href = cm_connection.create_or_update_connection(connection) + if href: + LOGGER.info(f"set_config_for_service: Created service {uuid} as {href} (connection={str(connection)})") + return True + else: + LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)})") + return False + except ExternalError as e: + LOGGER.error(f"set_config_for_service: Service creation failure for {uuid} (connection={str(connection)}): {str(e)}") + return e # Intentionally catching all exceptions, as they are stored in a list as return values # by the caller # pylint: disable=broad-except