From 35df1abb2b8910d8fa9a5ce02a6ce4d52046b402 Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Mon, 3 Mar 2025 10:34:23 +0100 Subject: [PATCH 01/14] feat(drivers): add morpheus driver --- .../drivers/morpheus/MorpheusApiDriver.py | 254 ++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 src/device/service/drivers/morpheus/MorpheusApiDriver.py diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py new file mode 100644 index 000000000..f9b033cc3 --- /dev/null +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -0,0 +1,254 @@ +import logging, requests, threading, json +from typing import Any, Iterator, List, Optional, Tuple, Union, Dict +from queue import Queue +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from device.service.driver_api._Driver import _Driver + +LOGGER = logging.getLogger(__name__) + +DRIVER_NAME = 'morpheus' +METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) + +class MorpheusApiDriver(_Driver): + def __init__(self, address: str, port: int, **settings) -> None: + super().__init__(DRIVER_NAME, address, port, **settings) + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + scheme = self.settings.get('scheme', 'http') + self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) + self.__timeout = int(self.settings.get('timeout', 120)) + self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'} + + self.__detection_thread = None + self.__pipeline_error_thread = None + + size = self.settings.get('queue_events_size', 10) + self.__pipeline_error_queue = Queue(maxsize=size) + self.__detection_queue = Queue(maxsize=size) + + def Connect(self) -> bool: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus' + with self.__lock: + if self.__started.is_set(): return True + try: + requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + self.__started.set() + return True + except requests.exceptions.Timeout: + LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root))) + return False + except Exception: # pylint: disable=broad-except + LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root))) + return False + + def Disconnect(self) -> bool: + with self.__lock: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.UnsubscribeDetectionEvent() + + if self.__pipeline_thread and self.__pipeline_thread.is_alive(): + self.UnsubscribePipelineError() + except Exception as e: + LOGGER.exception(f'Error during disconnect: {str(e)}') + + self.__terminate.set() + return True + + @metered_subclass_method(METRICS_POOL) + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + result = [] + + for key, value in config.items(): + result.append((key, value)) + + return result + except Exception as e: + LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root))) + return [] + + @metered_subclass_method(METRICS_POOL) + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + results = [] + + if not resource_keys: + for key, value in config.items(): + results.append((key, value)) + return results + + for key in resource_keys: + try: + results.append(config[key]) + except KeyError: + results.append(None) + except Exception as e: + results.append(e) + + return results + return [(key, None) for key in resource_keys] + except Exception as e: + LOGGER.exception(f'Error getting config: {str(e)}') + return [(key, e) for key in resource_keys] + + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + results = [] + with self.__lock: + config = dict(resources) + try: + response = requests.put(url, + headers=self.__headers, + json=config, + timeout=self.__timeout, + verify=False) + results.append(response.ok) + except Exception as e: + results.append(e) + return results + + def GetState(self) -> List[Tuple[str, Any]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + state = response.json() + result = [] + for key, value in state.items(): + result.append((key, value)) + return result + return [] + except Exception as e: + LOGGER.exception(f'Error getting state: {str(e)}') + return [] + + def StartPipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: + LOGGER.exception(f'Error starting pipeline: {e}') + return e + + def StopPipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: + LOGGER.exception(f'Error stopping pipeline: {e}') + return False + + @metered_subclass_method(METRICS_POOL) + def SubscribeDetectionEvent(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event' + with self.__lock: + try: + self.__detection_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__detection_queue), + daemon=True + ) + self.__detection_thread.start() + return True + except Exception as e: + LOGGER.exception(f'Error subscribing to detection events: {str(e)}') + return e + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.__detection_thread.join(timeout=5) + return True + except Exception as e: + LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}') + return e + + def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + event = self.__detection_queue.get(block=blocking, timeout=0.1) + if event is not None: + yield event + except queue.Empty: + if blocking: + continue + return + + def SubscribePipelineError(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error' + with self.__lock: + try: + self.__pipeline_error_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__pipeline_error_queue), + daemon=True + ) + self.__pipeline_error_thread.start() + return True + except Exception as e: + LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}') + return e + + def UnsubscribePipelineError(self) -> Union[bool, Exception]: + try: + if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): + self.__pipeline_error_thread.join(timeout=5) + return True + except Exception as e: + LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}') + return e + + def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1) + if error is not None: + yield error + except queue.Empty: + if blocking: + continue + return + + def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None: + try: + with requests.get(url, + headers=self.__headers, + stream=True, + verify=False) as response: + + if not response.ok: + LOGGER.error(f'Error connecting to event stream: {response.status_code}') + return + + try: + event = response.json() + queue.put(event['data']['ietf-restconf:notification']) + except json.JSONDecodeError as e: + LOGGER.error(f'Error parsing event: {e}') + except Exception as e: + LOGGER.exception(f'Error in notification stream handler: {str(e)}') -- GitLab From 54f5199807a18f02d64f30d92485becf762594d8 Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Tue, 4 Mar 2025 13:35:22 +0100 Subject: [PATCH 02/14] test(device): add morpheus driver tests --- scripts/run_tests_locally-device-morpheus.sh | 10 +++ src/device/tests/test_unitary_morpheus.py | 91 ++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100755 scripts/run_tests_locally-device-morpheus.sh create mode 100644 src/device/tests/test_unitary_morpheus.py diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh new file mode 100755 index 000000000..1af88264e --- /dev/null +++ b/scripts/run_tests_locally-device-morpheus.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc + +# Run unitary tests and analyze coverage of code at same time +coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ + device/tests/test_unitary_morpheus.py diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py new file mode 100644 index 000000000..cf754e091 --- /dev/null +++ b/src/device/tests/test_unitary_morpheus.py @@ -0,0 +1,91 @@ +import os +os.environ['DEVICE_EMULATED_ONLY'] = 'YES' + +# pylint: disable=wrong-import-position +import json +import logging, pytest, time +from typing import Dict, List +from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver + +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +##### DRIVER FIXTURE ################################################################################################### + +DRIVER_SETTING_ADDRESS = '127.0.0.1' +DRIVER_SETTING_PORT = 8090 + +@pytest.fixture(scope='session') +def driver() -> MorpheusApiDriver: + _driver = MorpheusApiDriver( + DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT, + ) + _driver.Connect() + yield _driver + time.sleep(1) + _driver.Disconnect() + + +##### TEST METHODS ##################################################################################################### + +def print_data(label, data): + print(f"{label}: {json.dumps(data, indent=2)}") + +def test_initial_config_retrieval(driver: MorpheusApiDriver): + config = driver.GetInitialConfig() + + assert isinstance(config, list), "Expected a list for initial config" + assert len(config) > 0, "Initial config should not be empty" + + print_data("Initial Config", config) + +def test_retrieve_config(driver: MorpheusApiDriver): + config = driver.GetConfig(None) + + assert isinstance(config, list), "Expected a list for config" + assert len(config) > 0, "Config should not be empty" + + print_data("Config", config) + +def test_set_config(driver: MorpheusApiDriver): + results = driver.SetConfig([('traffic_type', 'UDP')]) + + assert len(results) == 1, "Expected only one result" + assert results[0] is True, "Expected a succesfull result" + +def test_retrieve_state(driver: MorpheusApiDriver): + state = driver.GetState() + + assert isinstance(state, list), "Expected a a list for initial config" + assert len(state) > 0, " State should not be empty" + + print_data("State", state) + +def test_pipeline(driver: MorpheusApiDriver): + result = driver.StartPipeline() + + assert result is True + + result = driver.StopPipeline() + + assert result is True + +def test_subscription_detection(driver: MorpheusApiDriver): + result = driver.SubscribeDetectionEvent() + + assert result is True + + result = driver.UnsubscribeDetectionEvent() + + assert result is True + +def test_subscription_error(driver: MorpheusApiDriver): + result = driver.SubscribePipelineError() + + assert result is True + + result = driver.UnsubscribePipelineError() + + assert result is True -- GitLab From 6043618872050a2e377d6eb721bb8f5f3ef11b43 Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Mon, 5 May 2025 09:47:37 +0200 Subject: [PATCH 03/14] refactor(drivers): respect the driver's interface --- .../drivers/morpheus/MorpheusApiDriver.py | 136 +++++++++++------- src/device/tests/test_unitary_morpheus.py | 36 ++--- 2 files changed, 95 insertions(+), 77 deletions(-) diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py index f9b033cc3..fd4d2a72f 100644 --- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -1,4 +1,4 @@ -import logging, requests, threading, json +import logging, requests, threading, json, time from typing import Any, Iterator, List, Optional, Tuple, Union, Dict from queue import Queue from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method @@ -46,10 +46,10 @@ class MorpheusApiDriver(_Driver): with self.__lock: try: if self.__detection_thread and self.__detection_thread.is_alive(): - self.UnsubscribeDetectionEvent() + self.__unsubscribe_detection_event() if self.__pipeline_thread and self.__pipeline_thread.is_alive(): - self.UnsubscribePipelineError() + self.__unsubscribe_pipeline_error() except Exception as e: LOGGER.exception(f'Error during disconnect: {str(e)}') @@ -94,7 +94,7 @@ class MorpheusApiDriver(_Driver): results.append(config[key]) except KeyError: results.append(None) - except Exception as e: + except Exception as e: results.append(e) return results @@ -120,23 +120,72 @@ class MorpheusApiDriver(_Driver): results.append(e) return results - def GetState(self) -> List[Tuple[str, Any]]: - url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' - with self.__lock: + def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + + internal_state = self.__get_state() + if internal_state is not None: + timestamp = time.time() + yield(timestamp, 'state', internal_state) + + pipeline_error_empty = False + detection_event_empty = False + try: - response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) - if response.ok: - state = response.json() - result = [] - for key, value in state.items(): - result.append((key, value)) - return result - return [] - except Exception as e: - LOGGER.exception(f'Error getting state: {str(e)}') - return [] + error = self.__pipeline_error_queue.get(block=False, timeout=0.1) + if error is not None: + yield (error.get('eventTime', time.time()), 'pipeline_error', error.get('event'),) + except queue.Empty: + pipeline_error_empty = True - def StartPipeline(self) -> Union[bool, Exception]: + try: + event = self.__detection_queue.get(block=False, timeout=0.1) + if event is not None: + yield (event.get('eventTime', time.time()), 'detection_event', error.get('event'),) + except queue.Empty: + detection_event_empty = True + + if pipeline_error_empty and detection_event_empty: + if blocking: + continue + return + + @metered_subclass_method(METRICS_POOL) + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool,Exception]]: + results = [] + rollback_stack = [] + operations = [ + (self.__subscribe_detection_event, self.__unsubscribe_detection_event), + (self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error), + (self.__start_pipeline, self.__stop_pipeline), + ] + for i, (sub_op, unsub_op) in enumerate(operations): + result = sub_op() + reuslts.append(result) + if isinstance(result, Exception): + while rollback_stack: + rollback_op = rollback_stack.pop() + try: + rollback_op() + except Exception as e: + LOGGER.exception(f'Error during subscription rollback operation: {e}') + + return results + + rollback_stack.append(unsub_op) + return results + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeState(self, subscriptions: List[Tuple[str,float,float]]) -> List[Union[bool, Exception]]: + results = [] + results.append(self.__stop_pipeline()) + results.append(self.__unsubscribe_pipeline_error()) + results.append(self.__unsubscribe_detection_event()) + return results + + def __start_pipeline(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start' with self.__lock: try: @@ -147,7 +196,7 @@ class MorpheusApiDriver(_Driver): LOGGER.exception(f'Error starting pipeline: {e}') return e - def StopPipeline(self) -> Union[bool, Exception]: + def __stop_pipeline(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop' with self.__lock: try: @@ -156,10 +205,9 @@ class MorpheusApiDriver(_Driver): return True except Exception as e: LOGGER.exception(f'Error stopping pipeline: {e}') - return False + return e - @metered_subclass_method(METRICS_POOL) - def SubscribeDetectionEvent(self) -> Union[bool, Exception]: + def __subscribe_detection_event(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event' with self.__lock: try: @@ -174,8 +222,7 @@ class MorpheusApiDriver(_Driver): LOGGER.exception(f'Error subscribing to detection events: {str(e)}') return e - @metered_subclass_method(METRICS_POOL) - def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]: + def __unsubscribe_detection_event(self) -> Union[bool, Exception]: try: if self.__detection_thread and self.__detection_thread.is_alive(): self.__detection_thread.join(timeout=5) @@ -184,20 +231,7 @@ class MorpheusApiDriver(_Driver): LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}') return e - def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]: - while True: - if self.__terminate.is_set(): break - if terminate is not None and terminate.is_set(): break - try: - event = self.__detection_queue.get(block=blocking, timeout=0.1) - if event is not None: - yield event - except queue.Empty: - if blocking: - continue - return - - def SubscribePipelineError(self) -> Union[bool, Exception]: + def __subscribe_pipeline_error(self) -> Union[bool, Exception]: url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error' with self.__lock: try: @@ -212,7 +246,7 @@ class MorpheusApiDriver(_Driver): LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}') return e - def UnsubscribePipelineError(self) -> Union[bool, Exception]: + def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]: try: if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): self.__pipeline_error_thread.join(timeout=5) @@ -221,18 +255,18 @@ class MorpheusApiDriver(_Driver): LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}') return e - def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]: - while True: - if self.__terminate.is_set(): break - if terminate is not None and terminate.is_set(): break + def __get_state(self) -> Dict: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' + with self.__lock: try: - error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1) - if error is not None: - yield error - except queue.Empty: - if blocking: - continue - return + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + state = response.json() + return state + except Exception as e: + LOGGER.exception(f'Error getting internal state: {e}') + return None + def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None: try: diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index cf754e091..3fa1f8688 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -3,6 +3,7 @@ os.environ['DEVICE_EMULATED_ONLY'] = 'YES' # pylint: disable=wrong-import-position import json +import threading import logging, pytest, time from typing import Dict, List from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver @@ -56,36 +57,19 @@ def test_set_config(driver: MorpheusApiDriver): assert results[0] is True, "Expected a succesfull result" def test_retrieve_state(driver: MorpheusApiDriver): + results = driver.SubscribeState() + + assert all(isinstance(result, bool) and result for result in results), \ + f"Subscription error: {results}" + state = driver.GetState() - assert isinstance(state, list), "Expected a a list for initial config" + assert isinstance(state, iter), "Expected an iterator for state" assert len(state) > 0, " State should not be empty" print_data("State", state) -def test_pipeline(driver: MorpheusApiDriver): - result = driver.StartPipeline() - - assert result is True - - result = driver.StopPipeline() - - assert result is True - -def test_subscription_detection(driver: MorpheusApiDriver): - result = driver.SubscribeDetectionEvent() - - assert result is True - - result = driver.UnsubscribeDetectionEvent() - - assert result is True - -def test_subscription_error(driver: MorpheusApiDriver): - result = driver.SubscribePipelineError() - - assert result is True - - result = driver.UnsubscribePipelineError() + results = driver.UnsubscribeState() - assert result is True + assert all(isinstance(result, bool) and result for result in results), \ + f"Unsubscription error: {results}" -- GitLab From 6b03c8fc01c6e73ce3f93205b4470fcc8b26ac05 Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Mon, 5 May 2025 09:50:35 +0200 Subject: [PATCH 04/14] feat(morpheus): add morpheus driver to controller --- proto/context.proto | 1 + src/common/DeviceTypes.py | 1 + .../service/database/models/enums/DeviceDriver.py | 1 + src/device/service/drivers/__init__.py | 11 +++++++++++ src/webui/service/device/forms.py | 1 + src/webui/service/device/routes.py | 2 ++ src/webui/service/templates/device/add.html | 3 ++- 7 files changed, 19 insertions(+), 1 deletion(-) diff --git a/proto/context.proto b/proto/context.proto index 01e096233..42b8a69cc 100644 --- a/proto/context.proto +++ b/proto/context.proto @@ -226,6 +226,7 @@ enum DeviceDriverEnum { DEVICEDRIVER_IETF_L3VPN = 13; DEVICEDRIVER_IETF_SLICE = 14; DEVICEDRIVER_NCE = 15; + DEVICEDRIVER_MORPHEUS = 16; } enum DeviceOperationalStatusEnum { diff --git a/src/common/DeviceTypes.py b/src/common/DeviceTypes.py index 9a982d1eb..b917524a6 100644 --- a/src/common/DeviceTypes.py +++ b/src/common/DeviceTypes.py @@ -50,6 +50,7 @@ class DeviceTypeEnum(Enum): XR_CONSTELLATION = 'xr-constellation' QKD_NODE = 'qkd-node' OPEN_ROADM = 'openroadm' + MORPHEUS = 'morpheus' # ETSI TeraFlowSDN controller TERAFLOWSDN_CONTROLLER = 'teraflowsdn' diff --git a/src/context/service/database/models/enums/DeviceDriver.py b/src/context/service/database/models/enums/DeviceDriver.py index fe0d83fb1..216cd4461 100644 --- a/src/context/service/database/models/enums/DeviceDriver.py +++ b/src/context/service/database/models/enums/DeviceDriver.py @@ -38,6 +38,7 @@ class ORM_DeviceDriverEnum(enum.Enum): IETF_SLICE = DeviceDriverEnum.DEVICEDRIVER_IETF_SLICE OC = DeviceDriverEnum.DEVICEDRIVER_OC QKD = DeviceDriverEnum.DEVICEDRIVER_QKD + MORPHEUS = DeviceDriverEnum.DEVICEDRIVER_MORPHEUS grpc_to_enum__device_driver = functools.partial( grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index e3102cdf5..fdbf4739a 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -217,3 +217,14 @@ if LOAD_ALL_DEVICE_DRIVERS: FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_QKD, } ])) + +if LOAD_ALL_DEVICE_DRIVERS: + from .morpheus.MorpheusApiDriver import MorpheusAPIDriver + DRIVERS.append( + (MorpheusAPIDriver, [ + { + # Close enough, it does optical switching + FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS, + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS, + } + ])) diff --git a/src/webui/service/device/forms.py b/src/webui/service/device/forms.py index 4fa6829e7..3005c4cb3 100644 --- a/src/webui/service/device/forms.py +++ b/src/webui/service/device/forms.py @@ -34,6 +34,7 @@ class AddDeviceForm(FlaskForm): device_drivers_optical_tfs = BooleanField('OPTICAL TFS') device_drivers_ietf_actn = BooleanField('IETF ACTN') device_drivers_qkd = BooleanField('QKD') + device_drivers_morpheus = BooleanField('MORPHEUS') device_config_address = StringField('connect/address',default='127.0.0.1',validators=[DataRequired(), Length(min=5)]) device_config_port = StringField('connect/port',default='0',validators=[DataRequired(), Length(min=1)]) diff --git a/src/webui/service/device/routes.py b/src/webui/service/device/routes.py index 16b86c769..a7dbd5e02 100644 --- a/src/webui/service/device/routes.py +++ b/src/webui/service/device/routes.py @@ -135,6 +135,8 @@ def add(): device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_IETF_ACTN) if form.device_drivers_qkd.data: device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_QKD) + if form.device_drivers_morpheus.data: + device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_MORPHEUS) device_obj.device_drivers.extend(device_drivers) # pylint: disable=no-member try: diff --git a/src/webui/service/templates/device/add.html b/src/webui/service/templates/device/add.html index b5628ab59..b39ce33ad 100644 --- a/src/webui/service/templates/device/add.html +++ b/src/webui/service/templates/device/add.html @@ -96,6 +96,7 @@ {{ form.device_drivers_optical_tfs }} {{ form.device_drivers_optical_tfs.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_ietf_actn }} {{ form.device_drivers_ietf_actn.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_qkd }} {{ form.device_drivers_qkd.label(class="col-sm-3 col-form-label") }} + {{ form.device_drivers_morpheus }} {{ form.device_drivers_morpheus.label(class="col-sm-3 col-form-label") }} {% endif %} </div> </div> @@ -159,4 +160,4 @@ </div> </fieldset> </form> -{% endblock %} \ No newline at end of file +{% endblock %} -- GitLab From fa23a381ec0ef4edc9a84e6b12b8978ebd2ea25f Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Mon, 5 May 2025 09:51:10 +0200 Subject: [PATCH 05/14] fix(morpheus): add copyright --- scripts/run_tests_locally-device-morpheus.sh | 13 +++++++++++++ .../service/drivers/morpheus/MorpheusApiDriver.py | 14 ++++++++++++++ src/device/tests/test_unitary_morpheus.py | 14 ++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh index 1af88264e..b96fb6f05 100755 --- a/scripts/run_tests_locally-device-morpheus.sh +++ b/scripts/run_tests_locally-device-morpheus.sh @@ -1,4 +1,17 @@ #!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. PROJECTDIR=`pwd` diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py index fd4d2a72f..3bef23a50 100644 --- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -1,3 +1,17 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging, requests, threading, json, time from typing import Any, Iterator, List, Optional, Tuple, Union, Dict from queue import Queue diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index 3fa1f8688..b909dddb9 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -1,3 +1,17 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os os.environ['DEVICE_EMULATED_ONLY'] = 'YES' -- GitLab From da3611de12c56c844464bb37c72a62a9536f5eb6 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 09:45:54 +0000 Subject: [PATCH 06/14] Pre-merge code cleanup --- src/device/service/drivers/__init__.py | 21 +++--- .../drivers/morpheus/MorpheusApiDriver.py | 71 +++++++++---------- .../service/drivers/morpheus/__init__.py | 13 ++++ 3 files changed, 56 insertions(+), 49 deletions(-) create mode 100644 src/device/service/drivers/morpheus/__init__.py diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index cf163474c..fec407df9 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -173,6 +173,16 @@ if LOAD_ALL_DEVICE_DRIVERS: } ])) +if LOAD_ALL_DEVICE_DRIVERS: + from .morpheus.MorpheusApiDriver import MorpheusApiDriver + DRIVERS.append( + (MorpheusApiDriver, [ + { + # Close enough, it does optical switching + FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS, + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS, + } + ])) if LOAD_ALL_DEVICE_DRIVERS: from .microwave.IETFApiDriver import IETFApiDriver # pylint: disable=wrong-import-position @@ -232,14 +242,3 @@ if LOAD_ALL_DEVICE_DRIVERS: FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_QKD, } ])) - -if LOAD_ALL_DEVICE_DRIVERS: - from .morpheus.MorpheusApiDriver import MorpheusAPIDriver - DRIVERS.append( - (MorpheusAPIDriver, [ - { - # Close enough, it does optical switching - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS, - FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS, - } - ])) diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py index 3bef23a50..10181171f 100644 --- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, requests, threading, json, time +import json, logging, queue, requests, threading, time from typing import Any, Iterator, List, Optional, Tuple, Union, Dict -from queue import Queue from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from device.service.driver_api._Driver import _Driver @@ -38,8 +37,8 @@ class MorpheusApiDriver(_Driver): self.__pipeline_error_thread = None size = self.settings.get('queue_events_size', 10) - self.__pipeline_error_queue = Queue(maxsize=size) - self.__detection_queue = Queue(maxsize=size) + self.__pipeline_error_queue = queue.Queue(maxsize=size) + self.__detection_queue = queue.Queue(maxsize=size) def Connect(self) -> bool: url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus' @@ -62,10 +61,10 @@ class MorpheusApiDriver(_Driver): if self.__detection_thread and self.__detection_thread.is_alive(): self.__unsubscribe_detection_event() - if self.__pipeline_thread and self.__pipeline_thread.is_alive(): + if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): self.__unsubscribe_pipeline_error() - except Exception as e: - LOGGER.exception(f'Error during disconnect: {str(e)}') + except Exception: # pylint: disable=broad-except + LOGGER.exception('Error during disconnect') self.__terminate.set() return True @@ -84,7 +83,7 @@ class MorpheusApiDriver(_Driver): result.append((key, value)) return result - except Exception as e: + except Exception: # pylint: disable=broad-except LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root))) return [] @@ -105,16 +104,12 @@ class MorpheusApiDriver(_Driver): for key in resource_keys: try: - results.append(config[key]) - except KeyError: - results.append(None) - except Exception as e: - results.append(e) - + results.append((key, config[key])) + except Exception as e: # pylint: disable=broad-except + results.append((key, e)) return results - return [(key, None) for key in resource_keys] - except Exception as e: - LOGGER.exception(f'Error getting config: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error getting config') return [(key, e) for key in resource_keys] @metered_subclass_method(METRICS_POOL) @@ -175,16 +170,16 @@ class MorpheusApiDriver(_Driver): (self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error), (self.__start_pipeline, self.__stop_pipeline), ] - for i, (sub_op, unsub_op) in enumerate(operations): + for _, (sub_op, unsub_op) in enumerate(operations): result = sub_op() - reuslts.append(result) + results.append(result) if isinstance(result, Exception): while rollback_stack: rollback_op = rollback_stack.pop() try: rollback_op() - except Exception as e: - LOGGER.exception(f'Error during subscription rollback operation: {e}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error during subscription rollback operation') return results @@ -206,8 +201,8 @@ class MorpheusApiDriver(_Driver): response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) response.raise_for_status() return True - except Exception as e: - LOGGER.exception(f'Error starting pipeline: {e}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error starting pipeline') return e def __stop_pipeline(self) -> Union[bool, Exception]: @@ -217,8 +212,8 @@ class MorpheusApiDriver(_Driver): response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) response.raise_for_status() return True - except Exception as e: - LOGGER.exception(f'Error stopping pipeline: {e}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error stopping pipeline') return e def __subscribe_detection_event(self) -> Union[bool, Exception]: @@ -232,8 +227,8 @@ class MorpheusApiDriver(_Driver): ) self.__detection_thread.start() return True - except Exception as e: - LOGGER.exception(f'Error subscribing to detection events: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error subscribing to detection events') return e def __unsubscribe_detection_event(self) -> Union[bool, Exception]: @@ -241,8 +236,8 @@ class MorpheusApiDriver(_Driver): if self.__detection_thread and self.__detection_thread.is_alive(): self.__detection_thread.join(timeout=5) return True - except Exception as e: - LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error unsubscribing from detection events') return e def __subscribe_pipeline_error(self) -> Union[bool, Exception]: @@ -256,8 +251,8 @@ class MorpheusApiDriver(_Driver): ) self.__pipeline_error_thread.start() return True - except Exception as e: - LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error subscribing to pipeline errors') return e def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]: @@ -265,8 +260,8 @@ class MorpheusApiDriver(_Driver): if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): self.__pipeline_error_thread.join(timeout=5) return True - except Exception as e: - LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error unsubscribing from pipeline errors') return e def __get_state(self) -> Dict: @@ -277,12 +272,12 @@ class MorpheusApiDriver(_Driver): if response.ok: state = response.json() return state - except Exception as e: - LOGGER.exception(f'Error getting internal state: {e}') + except Exception: # pylint: disable=broad-except + LOGGER.exception('Error getting internal state') return None - def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None: + def __handle_notification_stream(self, url: str, queue: queue.Queue[Any]) -> None: try: with requests.get(url, headers=self.__headers, @@ -298,5 +293,5 @@ class MorpheusApiDriver(_Driver): queue.put(event['data']['ietf-restconf:notification']) except json.JSONDecodeError as e: LOGGER.error(f'Error parsing event: {e}') - except Exception as e: - LOGGER.exception(f'Error in notification stream handler: {str(e)}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error in notification stream handler') diff --git a/src/device/service/drivers/morpheus/__init__.py b/src/device/service/drivers/morpheus/__init__.py new file mode 100644 index 000000000..023830645 --- /dev/null +++ b/src/device/service/drivers/morpheus/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -- GitLab From 26b8ec13e0a9e3fcef48cb61fb4d3cc3fe1d2d09 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 10:06:51 +0000 Subject: [PATCH 07/14] Pre-merge code cleanup --- src/device/.gitlab-ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/device/.gitlab-ci.yml b/src/device/.gitlab-ci.yml index 0c093b570..c216b14bf 100644 --- a/src/device/.gitlab-ci.yml +++ b/src/device/.gitlab-ci.yml @@ -20,6 +20,8 @@ build device: stage: build before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - docker ps -aq | xargs -r docker rm -f + - containerlab destroy --all --cleanup || true script: - docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" -- GitLab From 58af26b9e5486d9f667dd697a913caea781e3f7f Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 15:49:49 +0000 Subject: [PATCH 08/14] Pre-merge code cleanup --- src/device/tests/test_unitary_morpheus.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index b909dddb9..518cea9ed 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -71,7 +71,7 @@ def test_set_config(driver: MorpheusApiDriver): assert results[0] is True, "Expected a succesfull result" def test_retrieve_state(driver: MorpheusApiDriver): - results = driver.SubscribeState() + results = driver.SubscribeState([]) assert all(isinstance(result, bool) and result for result in results), \ f"Subscription error: {results}" @@ -83,7 +83,7 @@ def test_retrieve_state(driver: MorpheusApiDriver): print_data("State", state) - results = driver.UnsubscribeState() + results = driver.UnsubscribeState([]) assert all(isinstance(result, bool) and result for result in results), \ f"Unsubscription error: {results}" -- GitLab From 4f227e51bd4fd51f588cee03f917807444abab1d Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 15:58:58 +0000 Subject: [PATCH 09/14] Pre-merge code cleanup --- scripts/run_tests_locally-device-morpheus.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh index b96fb6f05..f9666e424 100755 --- a/scripts/run_tests_locally-device-morpheus.sh +++ b/scripts/run_tests_locally-device-morpheus.sh @@ -17,6 +17,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +COVERAGEFILE=$PROJECTDIR/coverage/.coverage + +# Destroy old coverage file and configure the correct folder on the .coveragerc file +rm -f $COVERAGEFILE +cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE # Run unitary tests and analyze coverage of code at same time coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ -- GitLab From 7f2d5e26917c1f87344b88ea13bd2ed382add304 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 16:09:28 +0000 Subject: [PATCH 10/14] Pre-merge code cleanup --- src/device/tests/test_unitary_morpheus.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index 518cea9ed..c3c2788d1 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -16,10 +16,7 @@ import os os.environ['DEVICE_EMULATED_ONLY'] = 'YES' # pylint: disable=wrong-import-position -import json -import threading -import logging, pytest, time -from typing import Dict, List +import json, logging, pytest, time, types from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver logging.basicConfig(level=logging.DEBUG) @@ -78,10 +75,11 @@ def test_retrieve_state(driver: MorpheusApiDriver): state = driver.GetState() - assert isinstance(state, iter), "Expected an iterator for state" - assert len(state) > 0, " State should not be empty" - - print_data("State", state) + assert isinstance(state, types.GeneratorType), "Expected an iterator for state" + for item in state: + #assert len(state) > 0, " State should not be empty" + print_data("State Item", item) + break results = driver.UnsubscribeState([]) -- GitLab From 98b68808068ba818bed77ad9bf0a6119215f342b Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 16:19:00 +0000 Subject: [PATCH 11/14] Pre-merge code cleanup --- src/device/tests/test_unitary_morpheus.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index c3c2788d1..bb96289b6 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -73,13 +73,13 @@ def test_retrieve_state(driver: MorpheusApiDriver): assert all(isinstance(result, bool) and result for result in results), \ f"Subscription error: {results}" - state = driver.GetState() + state = driver.GetState(blocking=True) assert isinstance(state, types.GeneratorType), "Expected an iterator for state" - for item in state: + for i, item in enumerate(state): #assert len(state) > 0, " State should not be empty" print_data("State Item", item) - break + if i > 3: break results = driver.UnsubscribeState([]) -- GitLab From 4295fcc41e282f1f050caa71d350f584fc10bbff Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 16:23:43 +0000 Subject: [PATCH 12/14] Pre-merge code cleanup --- src/device/tests/test_unitary_morpheus.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index bb96289b6..6e4af77a2 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -76,9 +76,14 @@ def test_retrieve_state(driver: MorpheusApiDriver): state = driver.GetState(blocking=True) assert isinstance(state, types.GeneratorType), "Expected an iterator for state" - for i, item in enumerate(state): - #assert len(state) > 0, " State should not be empty" - print_data("State Item", item) + i = 0 + for item in state: + if i == 0: + print_data("State Item", item) + else: + if item[1] == 'state': continue + print_data("Other Item", item) + i += 1 if i > 3: break results = driver.UnsubscribeState([]) -- GitLab From aa2859d14059d12e8b3964050a2de7e73954a877 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Mon, 5 May 2025 16:26:42 +0000 Subject: [PATCH 13/14] Pre-merge code cleanup --- src/device/tests/test_unitary_morpheus.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py index 6e4af77a2..27de889e5 100644 --- a/src/device/tests/test_unitary_morpheus.py +++ b/src/device/tests/test_unitary_morpheus.py @@ -80,6 +80,7 @@ def test_retrieve_state(driver: MorpheusApiDriver): for item in state: if i == 0: print_data("State Item", item) + i += 1 else: if item[1] == 'state': continue print_data("Other Item", item) -- GitLab From 4ed9cff39c6f1f04022626ef581f6a5a52b5ae94 Mon Sep 17 00:00:00 2001 From: Javier Mateos Najari <javier.mateos@naudit.es> Date: Tue, 6 May 2025 11:21:45 +0200 Subject: [PATCH 14/14] Pre-merge code cleanup --- .../drivers/morpheus/MorpheusApiDriver.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py index 10181171f..1864e810e 100644 --- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -152,7 +152,7 @@ class MorpheusApiDriver(_Driver): try: event = self.__detection_queue.get(block=False, timeout=0.1) if event is not None: - yield (event.get('eventTime', time.time()), 'detection_event', error.get('event'),) + yield (event.get('eventTime', time.time()), 'detection_event', event.get('event'),) except queue.Empty: detection_event_empty = True @@ -282,16 +282,25 @@ class MorpheusApiDriver(_Driver): with requests.get(url, headers=self.__headers, stream=True, - verify=False) as response: + verify=False, + timeout=(3.05, None)) as response: if not response.ok: LOGGER.error(f'Error connecting to event stream: {response.status_code}') return - try: - event = response.json() - queue.put(event['data']['ietf-restconf:notification']) - except json.JSONDecodeError as e: - LOGGER.error(f'Error parsing event: {e}') + for line in response.iter_lines(decode_unicode=True): + if not line: + continue + if line.startswith('data:'): + data = line[5:].strip() + LOGGER.error(f'Data received: {data}') + try: + event = json.loads(data) + queue.put(event['ietf-restconf:notification']) + except json.JSONDecodeError as e: + LOGGER.error(f'Error parsing event: {e}') + except KeyError as e: + LOGGER.error(f'Missing expected key in event: {e}') except Exception as e: # pylint: disable=broad-except LOGGER.exception('Error in notification stream handler') -- GitLab