From 35df1abb2b8910d8fa9a5ce02a6ce4d52046b402 Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Mon, 3 Mar 2025 10:34:23 +0100
Subject: [PATCH 01/14] feat(drivers): add morpheus driver

---
 .../drivers/morpheus/MorpheusApiDriver.py     | 254 ++++++++++++++++++
 1 file changed, 254 insertions(+)
 create mode 100644 src/device/service/drivers/morpheus/MorpheusApiDriver.py

diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
new file mode 100644
index 000000000..f9b033cc3
--- /dev/null
+++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
@@ -0,0 +1,254 @@
+import logging, requests, threading, json
+from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
+from queue import Queue
+from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
+from device.service.driver_api._Driver import _Driver
+
+LOGGER = logging.getLogger(__name__)
+
+DRIVER_NAME = 'morpheus'
+METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
+
+class MorpheusApiDriver(_Driver):
+    def __init__(self, address: str, port: int, **settings) -> None:
+        super().__init__(DRIVER_NAME, address, port, **settings)
+        self.__lock = threading.Lock()
+        self.__started = threading.Event()
+        self.__terminate = threading.Event()
+        scheme = self.settings.get('scheme', 'http')
+        self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
+        self.__timeout = int(self.settings.get('timeout', 120))
+        self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'}
+
+        self.__detection_thread = None
+        self.__pipeline_error_thread = None
+
+        size = self.settings.get('queue_events_size', 10)
+        self.__pipeline_error_queue = Queue(maxsize=size)
+        self.__detection_queue = Queue(maxsize=size)
+
+    def Connect(self) -> bool:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus'
+        with self.__lock:
+            if self.__started.is_set(): return True
+            try:
+                requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                self.__started.set()
+                return True
+            except requests.exceptions.Timeout:
+                LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root)))
+                return False
+            except Exception:  # pylint: disable=broad-except
+                LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root)))
+                return False
+
+    def Disconnect(self) -> bool:
+        with self.__lock:
+            try:
+                if self.__detection_thread and self.__detection_thread.is_alive():
+                    self.UnsubscribeDetectionEvent()
+
+                if self.__pipeline_thread and self.__pipeline_thread.is_alive():
+                    self.UnsubscribePipelineError()
+            except Exception as e:
+                LOGGER.exception(f'Error during disconnect: {str(e)}')
+
+            self.__terminate.set()
+            return True
+
+    @metered_subclass_method(METRICS_POOL)
+    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
+        with self.__lock:
+            try:
+                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                if response.ok:
+                    config = response.json()
+                    result = []
+
+                    for key, value in config.items():
+                        result.append((key, value))
+
+                    return  result
+            except Exception as e:
+                LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root)))
+            return []
+
+    @metered_subclass_method(METRICS_POOL)
+    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
+        with self.__lock:
+            try:
+                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                if response.ok:
+                    config = response.json()
+                    results = []
+
+                    if not resource_keys:
+                        for key, value in config.items():
+                            results.append((key, value))
+                        return results
+
+                    for key in resource_keys:
+                        try:
+                            results.append(config[key])
+                        except KeyError:
+                            results.append(None)
+                        except Exception as e:
+                            results.append(e)
+
+                    return results
+                return [(key, None) for key in resource_keys]
+            except Exception as e:
+                LOGGER.exception(f'Error getting config: {str(e)}')
+                return [(key, e) for key in resource_keys]
+
+    @metered_subclass_method(METRICS_POOL)
+    def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
+        results = []
+        with self.__lock:
+            config = dict(resources)
+            try:
+                response = requests.put(url,
+                                        headers=self.__headers,
+                                        json=config,
+                                        timeout=self.__timeout,
+                                        verify=False)
+                results.append(response.ok)
+            except Exception as e:
+                results.append(e)
+        return results
+
+    def GetState(self) -> List[Tuple[str, Any]]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
+        with self.__lock:
+            try:
+                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                if response.ok:
+                    state = response.json()
+                    result = []
+                    for key, value in state.items():
+                        result.append((key, value))
+                    return result
+                return []
+            except Exception as e:
+                LOGGER.exception(f'Error getting state: {str(e)}')
+                return []
+
+    def StartPipeline(self) -> Union[bool, Exception]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start'
+        with self.__lock:
+            try:
+                response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                response.raise_for_status()
+                return True
+            except Exception as e:
+                LOGGER.exception(f'Error starting pipeline: {e}')
+                return e
+
+    def StopPipeline(self) -> Union[bool, Exception]:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop'
+        with self.__lock:
+            try:
+                response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                response.raise_for_status()
+                return True
+            except Exception as e:
+                LOGGER.exception(f'Error stopping pipeline: {e}')
+                return False
+
+    @metered_subclass_method(METRICS_POOL)
+    def SubscribeDetectionEvent(self) -> Union[bool, Exception]:
+        url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event'
+        with self.__lock:
+            try:
+                self.__detection_thread = threading.Thread(
+                        target=self.__handle_notification_stream,
+                        args=(url, self.__detection_queue),
+                        daemon=True
+                        )
+                self.__detection_thread.start()
+                return True
+            except Exception as e:
+                LOGGER.exception(f'Error subscribing to detection events: {str(e)}')
+                return e
+
+    @metered_subclass_method(METRICS_POOL)
+    def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]:
+        try:
+            if self.__detection_thread and self.__detection_thread.is_alive():
+                self.__detection_thread.join(timeout=5)
+            return True
+        except Exception as e:
+            LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}')
+            return e
+
+    def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]:
+        while True:
+            if self.__terminate.is_set(): break
+            if terminate is not None and terminate.is_set(): break
+            try:
+                event = self.__detection_queue.get(block=blocking, timeout=0.1)
+                if event is not None:
+                    yield event
+            except queue.Empty:
+                if blocking:
+                    continue
+                return
+
+    def SubscribePipelineError(self) -> Union[bool, Exception]:
+        url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error'
+        with self.__lock:
+            try:
+                self.__pipeline_error_thread = threading.Thread(
+                        target=self.__handle_notification_stream,
+                        args=(url, self.__pipeline_error_queue),
+                        daemon=True
+                        )
+                self.__pipeline_error_thread.start()
+                return True
+            except Exception as e:
+                LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}')
+                return e
+
+    def UnsubscribePipelineError(self) -> Union[bool, Exception]:
+        try:
+            if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
+                self.__pipeline_error_thread.join(timeout=5)
+            return True
+        except Exception as e:
+            LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}')
+            return e
+
+    def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]:
+        while True:
+            if self.__terminate.is_set(): break
+            if terminate is not None and terminate.is_set(): break
+            try:
+                error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1)
+                if error is not None:
+                    yield error
+            except queue.Empty:
+                if blocking:
+                    continue
+                return
+
+    def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None:
+        try:
+            with requests.get(url,
+                              headers=self.__headers,
+                              stream=True,
+                              verify=False) as response:
+
+                if not response.ok:
+                    LOGGER.error(f'Error connecting to event stream: {response.status_code}')
+                    return
+
+                try:
+                    event = response.json()
+                    queue.put(event['data']['ietf-restconf:notification'])
+                except json.JSONDecodeError as e:
+                    LOGGER.error(f'Error parsing event: {e}')
+        except Exception as e:
+            LOGGER.exception(f'Error in notification stream handler: {str(e)}')
-- 
GitLab


From 54f5199807a18f02d64f30d92485becf762594d8 Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Tue, 4 Mar 2025 13:35:22 +0100
Subject: [PATCH 02/14] test(device): add morpheus driver tests

---
 scripts/run_tests_locally-device-morpheus.sh | 10 +++
 src/device/tests/test_unitary_morpheus.py    | 91 ++++++++++++++++++++
 2 files changed, 101 insertions(+)
 create mode 100755 scripts/run_tests_locally-device-morpheus.sh
 create mode 100644 src/device/tests/test_unitary_morpheus.py

diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh
new file mode 100755
index 000000000..1af88264e
--- /dev/null
+++ b/scripts/run_tests_locally-device-morpheus.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+PROJECTDIR=`pwd`
+
+cd $PROJECTDIR/src
+RCFILE=$PROJECTDIR/coverage/.coveragerc
+
+# Run unitary tests and analyze coverage of code at same time
+coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
+    device/tests/test_unitary_morpheus.py
diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
new file mode 100644
index 000000000..cf754e091
--- /dev/null
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -0,0 +1,91 @@
+import os
+os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
+
+# pylint: disable=wrong-import-position
+import json
+import logging, pytest, time
+from typing import Dict, List
+from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver
+
+logging.basicConfig(level=logging.DEBUG)
+LOGGER = logging.getLogger(__name__)
+LOGGER.setLevel(logging.DEBUG)
+
+
+##### DRIVER FIXTURE ###################################################################################################
+
+DRIVER_SETTING_ADDRESS  = '127.0.0.1'
+DRIVER_SETTING_PORT     = 8090
+
+@pytest.fixture(scope='session')
+def driver() -> MorpheusApiDriver:
+    _driver = MorpheusApiDriver(
+        DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT,
+    )
+    _driver.Connect()
+    yield _driver
+    time.sleep(1)
+    _driver.Disconnect()
+
+
+##### TEST METHODS #####################################################################################################
+
+def print_data(label, data):
+    print(f"{label}: {json.dumps(data, indent=2)}")
+
+def test_initial_config_retrieval(driver: MorpheusApiDriver):
+    config = driver.GetInitialConfig()
+
+    assert isinstance(config, list), "Expected a list for initial config"
+    assert len(config) > 0, "Initial config should not be empty"
+
+    print_data("Initial Config", config)
+
+def test_retrieve_config(driver: MorpheusApiDriver):
+    config = driver.GetConfig(None)
+
+    assert isinstance(config, list), "Expected a list for config"
+    assert len(config) > 0, "Config should not be empty"
+
+    print_data("Config", config)
+
+def test_set_config(driver: MorpheusApiDriver):
+    results = driver.SetConfig([('traffic_type', 'UDP')])
+
+    assert len(results) == 1, "Expected only one result"
+    assert results[0] is True, "Expected a succesfull result"
+
+def test_retrieve_state(driver: MorpheusApiDriver):
+    state = driver.GetState()
+
+    assert isinstance(state, list), "Expected a a list for initial config"
+    assert len(state) > 0, " State should not be empty"
+
+    print_data("State", state)
+
+def test_pipeline(driver: MorpheusApiDriver):
+    result = driver.StartPipeline()
+
+    assert result is True
+
+    result = driver.StopPipeline()
+
+    assert result is True
+
+def test_subscription_detection(driver: MorpheusApiDriver):
+    result = driver.SubscribeDetectionEvent()
+
+    assert result is True
+
+    result = driver.UnsubscribeDetectionEvent()
+
+    assert result is True
+
+def test_subscription_error(driver: MorpheusApiDriver):
+    result = driver.SubscribePipelineError()
+
+    assert result is True
+
+    result = driver.UnsubscribePipelineError()
+
+    assert result is True
-- 
GitLab


From 6043618872050a2e377d6eb721bb8f5f3ef11b43 Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Mon, 5 May 2025 09:47:37 +0200
Subject: [PATCH 03/14] refactor(drivers): respect the driver's interface

---
 .../drivers/morpheus/MorpheusApiDriver.py     | 136 +++++++++++-------
 src/device/tests/test_unitary_morpheus.py     |  36 ++---
 2 files changed, 95 insertions(+), 77 deletions(-)

diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
index f9b033cc3..fd4d2a72f 100644
--- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py
+++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
@@ -1,4 +1,4 @@
-import logging, requests, threading, json
+import logging, requests, threading, json, time
 from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
 from queue import Queue
 from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
@@ -46,10 +46,10 @@ class MorpheusApiDriver(_Driver):
         with self.__lock:
             try:
                 if self.__detection_thread and self.__detection_thread.is_alive():
-                    self.UnsubscribeDetectionEvent()
+                    self.__unsubscribe_detection_event()
 
                 if self.__pipeline_thread and self.__pipeline_thread.is_alive():
-                    self.UnsubscribePipelineError()
+                    self.__unsubscribe_pipeline_error()
             except Exception as e:
                 LOGGER.exception(f'Error during disconnect: {str(e)}')
 
@@ -94,7 +94,7 @@ class MorpheusApiDriver(_Driver):
                             results.append(config[key])
                         except KeyError:
                             results.append(None)
-                        except Exception as e:
+                except Exception as e:
                             results.append(e)
 
                     return results
@@ -120,23 +120,72 @@ class MorpheusApiDriver(_Driver):
                 results.append(e)
         return results
 
-    def GetState(self) -> List[Tuple[str, Any]]:
-        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
-        with self.__lock:
+    def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]:
+        while True:
+            if self.__terminate.is_set(): break
+            if terminate is not None and terminate.is_set(): break
+
+            internal_state = self.__get_state()
+            if internal_state is not None:
+                timestamp = time.time()
+                yield(timestamp, 'state', internal_state)
+
+            pipeline_error_empty = False
+            detection_event_empty = False
+
             try:
-                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
-                if response.ok:
-                    state = response.json()
-                    result = []
-                    for key, value in state.items():
-                        result.append((key, value))
-                    return result
-                return []
-            except Exception as e:
-                LOGGER.exception(f'Error getting state: {str(e)}')
-                return []
+                error = self.__pipeline_error_queue.get(block=False, timeout=0.1)
+                if error is not None:
+                    yield (error.get('eventTime', time.time()), 'pipeline_error', error.get('event'),)
+            except queue.Empty:
+                pipeline_error_empty = True
 
-    def StartPipeline(self) -> Union[bool, Exception]:
+            try:
+                event = self.__detection_queue.get(block=False, timeout=0.1)
+                if event is not None:
+                    yield (event.get('eventTime', time.time()), 'detection_event', error.get('event'),)
+            except queue.Empty:
+                detection_event_empty = True
+
+            if pipeline_error_empty and detection_event_empty:
+                if blocking:
+                    continue
+                return
+
+    @metered_subclass_method(METRICS_POOL)
+    def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool,Exception]]:
+        results = []
+        rollback_stack = []
+        operations = [
+                (self.__subscribe_detection_event, self.__unsubscribe_detection_event),
+                (self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error),
+                (self.__start_pipeline, self.__stop_pipeline),
+        ]
+        for i, (sub_op, unsub_op) in enumerate(operations):
+            result = sub_op()
+            reuslts.append(result)
+            if isinstance(result, Exception):
+                while rollback_stack:
+                    rollback_op = rollback_stack.pop()
+                    try:
+                        rollback_op()
+                    except Exception as e:
+                        LOGGER.exception(f'Error during subscription rollback operation: {e}')
+
+                return results
+
+            rollback_stack.append(unsub_op)
+        return results
+
+    @metered_subclass_method(METRICS_POOL)
+    def UnsubscribeState(self, subscriptions: List[Tuple[str,float,float]]) -> List[Union[bool, Exception]]:
+        results = []
+        results.append(self.__stop_pipeline())
+        results.append(self.__unsubscribe_pipeline_error())
+        results.append(self.__unsubscribe_detection_event())
+        return results
+
+    def __start_pipeline(self) -> Union[bool, Exception]:
         url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start'
         with self.__lock:
             try:
@@ -147,7 +196,7 @@ class MorpheusApiDriver(_Driver):
                 LOGGER.exception(f'Error starting pipeline: {e}')
                 return e
 
-    def StopPipeline(self) -> Union[bool, Exception]:
+    def __stop_pipeline(self) -> Union[bool, Exception]:
         url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop'
         with self.__lock:
             try:
@@ -156,10 +205,9 @@ class MorpheusApiDriver(_Driver):
                 return True
             except Exception as e:
                 LOGGER.exception(f'Error stopping pipeline: {e}')
-                return False
+                return e
 
-    @metered_subclass_method(METRICS_POOL)
-    def SubscribeDetectionEvent(self) -> Union[bool, Exception]:
+    def __subscribe_detection_event(self) -> Union[bool, Exception]:
         url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event'
         with self.__lock:
             try:
@@ -174,8 +222,7 @@ class MorpheusApiDriver(_Driver):
                 LOGGER.exception(f'Error subscribing to detection events: {str(e)}')
                 return e
 
-    @metered_subclass_method(METRICS_POOL)
-    def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]:
+    def __unsubscribe_detection_event(self) -> Union[bool, Exception]:
         try:
             if self.__detection_thread and self.__detection_thread.is_alive():
                 self.__detection_thread.join(timeout=5)
@@ -184,20 +231,7 @@ class MorpheusApiDriver(_Driver):
             LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}')
             return e
 
-    def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]:
-        while True:
-            if self.__terminate.is_set(): break
-            if terminate is not None and terminate.is_set(): break
-            try:
-                event = self.__detection_queue.get(block=blocking, timeout=0.1)
-                if event is not None:
-                    yield event
-            except queue.Empty:
-                if blocking:
-                    continue
-                return
-
-    def SubscribePipelineError(self) -> Union[bool, Exception]:
+    def __subscribe_pipeline_error(self) -> Union[bool, Exception]:
         url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error'
         with self.__lock:
             try:
@@ -212,7 +246,7 @@ class MorpheusApiDriver(_Driver):
                 LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}')
                 return e
 
-    def UnsubscribePipelineError(self) -> Union[bool, Exception]:
+    def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]:
         try:
             if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
                 self.__pipeline_error_thread.join(timeout=5)
@@ -221,18 +255,18 @@ class MorpheusApiDriver(_Driver):
             LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}')
             return e
 
-    def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]:
-        while True:
-            if self.__terminate.is_set(): break
-            if terminate is not None and terminate.is_set(): break
+    def __get_state(self) -> Dict:
+        url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
+        with self.__lock:
             try:
-                error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1)
-                if error is not None:
-                    yield error
-            except queue.Empty:
-                if blocking:
-                    continue
-                return
+                response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
+                if response.ok:
+                    state = response.json()
+                    return state
+            except Exception as e:
+                LOGGER.exception(f'Error getting internal state: {e}')
+        return None
+
 
     def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None:
         try:
diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index cf754e091..3fa1f8688 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -3,6 +3,7 @@ os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
 
 # pylint: disable=wrong-import-position
 import json
+import threading
 import logging, pytest, time
 from typing import Dict, List
 from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver
@@ -56,36 +57,19 @@ def test_set_config(driver: MorpheusApiDriver):
     assert results[0] is True, "Expected a succesfull result"
 
 def test_retrieve_state(driver: MorpheusApiDriver):
+    results = driver.SubscribeState()
+
+    assert all(isinstance(result, bool) and result for result in results), \
+            f"Subscription error: {results}"
+
     state = driver.GetState()
 
-    assert isinstance(state, list), "Expected a a list for initial config"
+    assert isinstance(state, iter), "Expected an iterator for state"
     assert len(state) > 0, " State should not be empty"
 
     print_data("State", state)
 
-def test_pipeline(driver: MorpheusApiDriver):
-    result = driver.StartPipeline()
-
-    assert result is True
-
-    result = driver.StopPipeline()
-
-    assert result is True
-
-def test_subscription_detection(driver: MorpheusApiDriver):
-    result = driver.SubscribeDetectionEvent()
-
-    assert result is True
-
-    result = driver.UnsubscribeDetectionEvent()
-
-    assert result is True
-
-def test_subscription_error(driver: MorpheusApiDriver):
-    result = driver.SubscribePipelineError()
-
-    assert result is True
-
-    result = driver.UnsubscribePipelineError()
+    results = driver.UnsubscribeState()
 
-    assert result is True
+    assert all(isinstance(result, bool) and result for result in results), \
+            f"Unsubscription error: {results}"
-- 
GitLab


From 6b03c8fc01c6e73ce3f93205b4470fcc8b26ac05 Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Mon, 5 May 2025 09:50:35 +0200
Subject: [PATCH 04/14] feat(morpheus): add morpheus driver to controller

---
 proto/context.proto                                   |  1 +
 src/common/DeviceTypes.py                             |  1 +
 .../service/database/models/enums/DeviceDriver.py     |  1 +
 src/device/service/drivers/__init__.py                | 11 +++++++++++
 src/webui/service/device/forms.py                     |  1 +
 src/webui/service/device/routes.py                    |  2 ++
 src/webui/service/templates/device/add.html           |  3 ++-
 7 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/proto/context.proto b/proto/context.proto
index 01e096233..42b8a69cc 100644
--- a/proto/context.proto
+++ b/proto/context.proto
@@ -226,6 +226,7 @@ enum DeviceDriverEnum {
   DEVICEDRIVER_IETF_L3VPN = 13;
   DEVICEDRIVER_IETF_SLICE = 14;
   DEVICEDRIVER_NCE = 15;
+  DEVICEDRIVER_MORPHEUS = 16;
 }
 
 enum DeviceOperationalStatusEnum {
diff --git a/src/common/DeviceTypes.py b/src/common/DeviceTypes.py
index 9a982d1eb..b917524a6 100644
--- a/src/common/DeviceTypes.py
+++ b/src/common/DeviceTypes.py
@@ -50,6 +50,7 @@ class DeviceTypeEnum(Enum):
     XR_CONSTELLATION                = 'xr-constellation'
     QKD_NODE                        = 'qkd-node'
     OPEN_ROADM                      = 'openroadm'
+    MORPHEUS                        = 'morpheus'
 
     # ETSI TeraFlowSDN controller
     TERAFLOWSDN_CONTROLLER          = 'teraflowsdn'
diff --git a/src/context/service/database/models/enums/DeviceDriver.py b/src/context/service/database/models/enums/DeviceDriver.py
index fe0d83fb1..216cd4461 100644
--- a/src/context/service/database/models/enums/DeviceDriver.py
+++ b/src/context/service/database/models/enums/DeviceDriver.py
@@ -38,6 +38,7 @@ class ORM_DeviceDriverEnum(enum.Enum):
     IETF_SLICE            = DeviceDriverEnum.DEVICEDRIVER_IETF_SLICE
     OC                    = DeviceDriverEnum.DEVICEDRIVER_OC
     QKD                   = DeviceDriverEnum.DEVICEDRIVER_QKD
+    MORPHEUS              = DeviceDriverEnum.DEVICEDRIVER_MORPHEUS
 
 grpc_to_enum__device_driver = functools.partial(
     grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py
index e3102cdf5..fdbf4739a 100644
--- a/src/device/service/drivers/__init__.py
+++ b/src/device/service/drivers/__init__.py
@@ -217,3 +217,14 @@ if LOAD_ALL_DEVICE_DRIVERS:
                 FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_QKD,
             }
         ]))
+
+if LOAD_ALL_DEVICE_DRIVERS:
+    from .morpheus.MorpheusApiDriver import MorpheusAPIDriver
+    DRIVERS.append(
+        (MorpheusAPIDriver, [
+            {
+                # Close enough, it does optical switching
+                FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS,
+                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS,
+            }
+        ]))
diff --git a/src/webui/service/device/forms.py b/src/webui/service/device/forms.py
index 4fa6829e7..3005c4cb3 100644
--- a/src/webui/service/device/forms.py
+++ b/src/webui/service/device/forms.py
@@ -34,6 +34,7 @@ class AddDeviceForm(FlaskForm):
     device_drivers_optical_tfs = BooleanField('OPTICAL TFS')
     device_drivers_ietf_actn = BooleanField('IETF ACTN')
     device_drivers_qkd = BooleanField('QKD')
+    device_drivers_morpheus = BooleanField('MORPHEUS')
 
     device_config_address = StringField('connect/address',default='127.0.0.1',validators=[DataRequired(), Length(min=5)])
     device_config_port = StringField('connect/port',default='0',validators=[DataRequired(), Length(min=1)])
diff --git a/src/webui/service/device/routes.py b/src/webui/service/device/routes.py
index 16b86c769..a7dbd5e02 100644
--- a/src/webui/service/device/routes.py
+++ b/src/webui/service/device/routes.py
@@ -135,6 +135,8 @@ def add():
             device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_IETF_ACTN)
         if form.device_drivers_qkd.data:
             device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_QKD)
+        if form.device_drivers_morpheus.data:
+            device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_MORPHEUS)
         device_obj.device_drivers.extend(device_drivers) # pylint: disable=no-member
 
         try:
diff --git a/src/webui/service/templates/device/add.html b/src/webui/service/templates/device/add.html
index b5628ab59..b39ce33ad 100644
--- a/src/webui/service/templates/device/add.html
+++ b/src/webui/service/templates/device/add.html
@@ -96,6 +96,7 @@
                 {{ form.device_drivers_optical_tfs }} {{ form.device_drivers_optical_tfs.label(class="col-sm-3 col-form-label") }}
                 {{ form.device_drivers_ietf_actn }} {{ form.device_drivers_ietf_actn.label(class="col-sm-3 col-form-label") }}
                 {{ form.device_drivers_qkd }} {{ form.device_drivers_qkd.label(class="col-sm-3 col-form-label") }}
+                {{ form.device_drivers_morpheus }} {{ form.device_drivers_morpheus.label(class="col-sm-3 col-form-label") }}
                 {% endif %}
             </div>
         </div>
@@ -159,4 +160,4 @@
         </div>
     </fieldset>
 </form>
-{% endblock %}
\ No newline at end of file
+{% endblock %}
-- 
GitLab


From fa23a381ec0ef4edc9a84e6b12b8978ebd2ea25f Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Mon, 5 May 2025 09:51:10 +0200
Subject: [PATCH 05/14] fix(morpheus): add copyright

---
 scripts/run_tests_locally-device-morpheus.sh       | 13 +++++++++++++
 .../service/drivers/morpheus/MorpheusApiDriver.py  | 14 ++++++++++++++
 src/device/tests/test_unitary_morpheus.py          | 14 ++++++++++++++
 3 files changed, 41 insertions(+)

diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh
index 1af88264e..b96fb6f05 100755
--- a/scripts/run_tests_locally-device-morpheus.sh
+++ b/scripts/run_tests_locally-device-morpheus.sh
@@ -1,4 +1,17 @@
 #!/bin/bash
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 PROJECTDIR=`pwd`
 
diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
index fd4d2a72f..3bef23a50 100644
--- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py
+++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
@@ -1,3 +1,17 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 import logging, requests, threading, json, time
 from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
 from queue import Queue
diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index 3fa1f8688..b909dddb9 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -1,3 +1,17 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 import os
 os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
 
-- 
GitLab


From da3611de12c56c844464bb37c72a62a9536f5eb6 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 09:45:54 +0000
Subject: [PATCH 06/14] Pre-merge code cleanup

---
 src/device/service/drivers/__init__.py        | 21 +++---
 .../drivers/morpheus/MorpheusApiDriver.py     | 71 +++++++++----------
 .../service/drivers/morpheus/__init__.py      | 13 ++++
 3 files changed, 56 insertions(+), 49 deletions(-)
 create mode 100644 src/device/service/drivers/morpheus/__init__.py

diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py
index cf163474c..fec407df9 100644
--- a/src/device/service/drivers/__init__.py
+++ b/src/device/service/drivers/__init__.py
@@ -173,6 +173,16 @@ if LOAD_ALL_DEVICE_DRIVERS:
             }
         ]))
 
+if LOAD_ALL_DEVICE_DRIVERS:
+    from .morpheus.MorpheusApiDriver import MorpheusApiDriver
+    DRIVERS.append(
+        (MorpheusApiDriver, [
+            {
+                # Close enough, it does optical switching
+                FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS,
+                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS,
+            }
+        ]))
 
 if LOAD_ALL_DEVICE_DRIVERS:
     from .microwave.IETFApiDriver import IETFApiDriver # pylint: disable=wrong-import-position
@@ -232,14 +242,3 @@ if LOAD_ALL_DEVICE_DRIVERS:
                 FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_QKD,
             }
         ]))
-
-if LOAD_ALL_DEVICE_DRIVERS:
-    from .morpheus.MorpheusApiDriver import MorpheusAPIDriver
-    DRIVERS.append(
-        (MorpheusAPIDriver, [
-            {
-                # Close enough, it does optical switching
-                FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS,
-                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS,
-            }
-        ]))
diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
index 3bef23a50..10181171f 100644
--- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py
+++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
@@ -12,9 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, requests, threading, json, time
+import json, logging, queue, requests, threading, time
 from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
-from queue import Queue
 from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
 from device.service.driver_api._Driver import _Driver
 
@@ -38,8 +37,8 @@ class MorpheusApiDriver(_Driver):
         self.__pipeline_error_thread = None
 
         size = self.settings.get('queue_events_size', 10)
-        self.__pipeline_error_queue = Queue(maxsize=size)
-        self.__detection_queue = Queue(maxsize=size)
+        self.__pipeline_error_queue = queue.Queue(maxsize=size)
+        self.__detection_queue = queue.Queue(maxsize=size)
 
     def Connect(self) -> bool:
         url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus'
@@ -62,10 +61,10 @@ class MorpheusApiDriver(_Driver):
                 if self.__detection_thread and self.__detection_thread.is_alive():
                     self.__unsubscribe_detection_event()
 
-                if self.__pipeline_thread and self.__pipeline_thread.is_alive():
+                if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
                     self.__unsubscribe_pipeline_error()
-            except Exception as e:
-                LOGGER.exception(f'Error during disconnect: {str(e)}')
+            except Exception:  # pylint: disable=broad-except
+                LOGGER.exception('Error during disconnect')
 
             self.__terminate.set()
             return True
@@ -84,7 +83,7 @@ class MorpheusApiDriver(_Driver):
                         result.append((key, value))
 
                     return  result
-            except Exception as e:
+            except Exception:  # pylint: disable=broad-except
                 LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root)))
             return []
 
@@ -105,16 +104,12 @@ class MorpheusApiDriver(_Driver):
 
                     for key in resource_keys:
                         try:
-                            results.append(config[key])
-                        except KeyError:
-                            results.append(None)
-                except Exception as e:
-                            results.append(e)
-
+                            results.append((key, config[key]))
+                        except Exception as e:  # pylint: disable=broad-except
+                            results.append((key, e))
                     return results
-                return [(key, None) for key in resource_keys]
-            except Exception as e:
-                LOGGER.exception(f'Error getting config: {str(e)}')
+            except Exception as e:  # pylint: disable=broad-except
+                LOGGER.exception('Error getting config')
                 return [(key, e) for key in resource_keys]
 
     @metered_subclass_method(METRICS_POOL)
@@ -175,16 +170,16 @@ class MorpheusApiDriver(_Driver):
                 (self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error),
                 (self.__start_pipeline, self.__stop_pipeline),
         ]
-        for i, (sub_op, unsub_op) in enumerate(operations):
+        for _, (sub_op, unsub_op) in enumerate(operations):
             result = sub_op()
-            reuslts.append(result)
+            results.append(result)
             if isinstance(result, Exception):
                 while rollback_stack:
                     rollback_op = rollback_stack.pop()
                     try:
                         rollback_op()
-                    except Exception as e:
-                        LOGGER.exception(f'Error during subscription rollback operation: {e}')
+                    except Exception as e:  # pylint: disable=broad-except
+                        LOGGER.exception('Error during subscription rollback operation')
 
                 return results
 
@@ -206,8 +201,8 @@ class MorpheusApiDriver(_Driver):
                 response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                 response.raise_for_status()
                 return True
-            except Exception as e:
-                LOGGER.exception(f'Error starting pipeline: {e}')
+            except Exception as e:  # pylint: disable=broad-except
+                LOGGER.exception('Error starting pipeline')
                 return e
 
     def __stop_pipeline(self) -> Union[bool, Exception]:
@@ -217,8 +212,8 @@ class MorpheusApiDriver(_Driver):
                 response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
                 response.raise_for_status()
                 return True
-            except Exception as e:
-                LOGGER.exception(f'Error stopping pipeline: {e}')
+            except Exception as e:  # pylint: disable=broad-except
+                LOGGER.exception('Error stopping pipeline')
                 return e
 
     def __subscribe_detection_event(self) -> Union[bool, Exception]:
@@ -232,8 +227,8 @@ class MorpheusApiDriver(_Driver):
                         )
                 self.__detection_thread.start()
                 return True
-            except Exception as e:
-                LOGGER.exception(f'Error subscribing to detection events: {str(e)}')
+            except Exception as e:  # pylint: disable=broad-except
+                LOGGER.exception('Error subscribing to detection events')
                 return e
 
     def __unsubscribe_detection_event(self) -> Union[bool, Exception]:
@@ -241,8 +236,8 @@ class MorpheusApiDriver(_Driver):
             if self.__detection_thread and self.__detection_thread.is_alive():
                 self.__detection_thread.join(timeout=5)
             return True
-        except Exception as e:
-            LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}')
+        except Exception as e:  # pylint: disable=broad-except
+            LOGGER.exception('Error unsubscribing from detection events')
             return e
 
     def __subscribe_pipeline_error(self) -> Union[bool, Exception]:
@@ -256,8 +251,8 @@ class MorpheusApiDriver(_Driver):
                         )
                 self.__pipeline_error_thread.start()
                 return True
-            except Exception as e:
-                LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}')
+            except Exception as e:  # pylint: disable=broad-except
+                LOGGER.exception('Error subscribing to pipeline errors')
                 return e
 
     def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]:
@@ -265,8 +260,8 @@ class MorpheusApiDriver(_Driver):
             if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
                 self.__pipeline_error_thread.join(timeout=5)
             return True
-        except Exception as e:
-            LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}')
+        except Exception as e:  # pylint: disable=broad-except
+            LOGGER.exception('Error unsubscribing from pipeline errors')
             return e
 
     def __get_state(self) -> Dict:
@@ -277,12 +272,12 @@ class MorpheusApiDriver(_Driver):
                 if response.ok:
                     state = response.json()
                     return state
-            except Exception as e:
-                LOGGER.exception(f'Error getting internal state: {e}')
+            except Exception:  # pylint: disable=broad-except
+                LOGGER.exception('Error getting internal state')
         return None
 
 
-    def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None:
+    def __handle_notification_stream(self, url: str, queue: queue.Queue[Any]) -> None:
         try:
             with requests.get(url,
                               headers=self.__headers,
@@ -298,5 +293,5 @@ class MorpheusApiDriver(_Driver):
                     queue.put(event['data']['ietf-restconf:notification'])
                 except json.JSONDecodeError as e:
                     LOGGER.error(f'Error parsing event: {e}')
-        except Exception as e:
-            LOGGER.exception(f'Error in notification stream handler: {str(e)}')
+        except Exception as e:  # pylint: disable=broad-except
+            LOGGER.exception('Error in notification stream handler')
diff --git a/src/device/service/drivers/morpheus/__init__.py b/src/device/service/drivers/morpheus/__init__.py
new file mode 100644
index 000000000..023830645
--- /dev/null
+++ b/src/device/service/drivers/morpheus/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
-- 
GitLab


From 26b8ec13e0a9e3fcef48cb61fb4d3cc3fe1d2d09 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 10:06:51 +0000
Subject: [PATCH 07/14] Pre-merge code cleanup

---
 src/device/.gitlab-ci.yml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/device/.gitlab-ci.yml b/src/device/.gitlab-ci.yml
index 0c093b570..c216b14bf 100644
--- a/src/device/.gitlab-ci.yml
+++ b/src/device/.gitlab-ci.yml
@@ -20,6 +20,8 @@ build device:
   stage: build
   before_script:
     - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
+    - docker ps -aq | xargs -r docker rm -f
+    - containerlab destroy --all --cleanup || true
   script:
     - docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
     - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
-- 
GitLab


From 58af26b9e5486d9f667dd697a913caea781e3f7f Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 15:49:49 +0000
Subject: [PATCH 08/14] Pre-merge code cleanup

---
 src/device/tests/test_unitary_morpheus.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index b909dddb9..518cea9ed 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -71,7 +71,7 @@ def test_set_config(driver: MorpheusApiDriver):
     assert results[0] is True, "Expected a succesfull result"
 
 def test_retrieve_state(driver: MorpheusApiDriver):
-    results = driver.SubscribeState()
+    results = driver.SubscribeState([])
 
     assert all(isinstance(result, bool) and result for result in results), \
             f"Subscription error: {results}"
@@ -83,7 +83,7 @@ def test_retrieve_state(driver: MorpheusApiDriver):
 
     print_data("State", state)
 
-    results = driver.UnsubscribeState()
+    results = driver.UnsubscribeState([])
 
     assert all(isinstance(result, bool) and result for result in results), \
             f"Unsubscription error: {results}"
-- 
GitLab


From 4f227e51bd4fd51f588cee03f917807444abab1d Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 15:58:58 +0000
Subject: [PATCH 09/14] Pre-merge code cleanup

---
 scripts/run_tests_locally-device-morpheus.sh | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh
index b96fb6f05..f9666e424 100755
--- a/scripts/run_tests_locally-device-morpheus.sh
+++ b/scripts/run_tests_locally-device-morpheus.sh
@@ -17,6 +17,11 @@ PROJECTDIR=`pwd`
 
 cd $PROJECTDIR/src
 RCFILE=$PROJECTDIR/coverage/.coveragerc
+COVERAGEFILE=$PROJECTDIR/coverage/.coverage
+
+# Destroy old coverage file and configure the correct folder on the .coveragerc file
+rm -f $COVERAGEFILE
+cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE
 
 # Run unitary tests and analyze coverage of code at same time
 coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
-- 
GitLab


From 7f2d5e26917c1f87344b88ea13bd2ed382add304 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 16:09:28 +0000
Subject: [PATCH 10/14] Pre-merge code cleanup

---
 src/device/tests/test_unitary_morpheus.py | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index 518cea9ed..c3c2788d1 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -16,10 +16,7 @@ import os
 os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
 
 # pylint: disable=wrong-import-position
-import json
-import threading
-import logging, pytest, time
-from typing import Dict, List
+import json, logging, pytest, time, types
 from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver
 
 logging.basicConfig(level=logging.DEBUG)
@@ -78,10 +75,11 @@ def test_retrieve_state(driver: MorpheusApiDriver):
 
     state = driver.GetState()
 
-    assert isinstance(state, iter), "Expected an iterator for state"
-    assert len(state) > 0, " State should not be empty"
-
-    print_data("State", state)
+    assert isinstance(state, types.GeneratorType), "Expected an iterator for state"
+    for item in state:
+        #assert len(state) > 0, " State should not be empty"
+        print_data("State Item", item)
+        break
 
     results = driver.UnsubscribeState([])
 
-- 
GitLab


From 98b68808068ba818bed77ad9bf0a6119215f342b Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 16:19:00 +0000
Subject: [PATCH 11/14] Pre-merge code cleanup

---
 src/device/tests/test_unitary_morpheus.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index c3c2788d1..bb96289b6 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -73,13 +73,13 @@ def test_retrieve_state(driver: MorpheusApiDriver):
     assert all(isinstance(result, bool) and result for result in results), \
             f"Subscription error: {results}"
 
-    state = driver.GetState()
+    state = driver.GetState(blocking=True)
 
     assert isinstance(state, types.GeneratorType), "Expected an iterator for state"
-    for item in state:
+    for i, item in enumerate(state):
         #assert len(state) > 0, " State should not be empty"
         print_data("State Item", item)
-        break
+        if i > 3: break
 
     results = driver.UnsubscribeState([])
 
-- 
GitLab


From 4295fcc41e282f1f050caa71d350f584fc10bbff Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 16:23:43 +0000
Subject: [PATCH 12/14] Pre-merge code cleanup

---
 src/device/tests/test_unitary_morpheus.py | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index bb96289b6..6e4af77a2 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -76,9 +76,14 @@ def test_retrieve_state(driver: MorpheusApiDriver):
     state = driver.GetState(blocking=True)
 
     assert isinstance(state, types.GeneratorType), "Expected an iterator for state"
-    for i, item in enumerate(state):
-        #assert len(state) > 0, " State should not be empty"
-        print_data("State Item", item)
+    i = 0
+    for item in state:
+        if i == 0:
+            print_data("State Item", item)
+        else:
+            if item[1] == 'state': continue
+            print_data("Other Item", item)
+            i += 1
         if i > 3: break
 
     results = driver.UnsubscribeState([])
-- 
GitLab


From aa2859d14059d12e8b3964050a2de7e73954a877 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Mon, 5 May 2025 16:26:42 +0000
Subject: [PATCH 13/14] Pre-merge code cleanup

---
 src/device/tests/test_unitary_morpheus.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py
index 6e4af77a2..27de889e5 100644
--- a/src/device/tests/test_unitary_morpheus.py
+++ b/src/device/tests/test_unitary_morpheus.py
@@ -80,6 +80,7 @@ def test_retrieve_state(driver: MorpheusApiDriver):
     for item in state:
         if i == 0:
             print_data("State Item", item)
+            i += 1
         else:
             if item[1] == 'state': continue
             print_data("Other Item", item)
-- 
GitLab


From 4ed9cff39c6f1f04022626ef581f6a5a52b5ae94 Mon Sep 17 00:00:00 2001
From: Javier Mateos Najari <javier.mateos@naudit.es>
Date: Tue, 6 May 2025 11:21:45 +0200
Subject: [PATCH 14/14] Pre-merge code cleanup

---
 .../drivers/morpheus/MorpheusApiDriver.py     | 23 +++++++++++++------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
index 10181171f..1864e810e 100644
--- a/src/device/service/drivers/morpheus/MorpheusApiDriver.py
+++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py
@@ -152,7 +152,7 @@ class MorpheusApiDriver(_Driver):
             try:
                 event = self.__detection_queue.get(block=False, timeout=0.1)
                 if event is not None:
-                    yield (event.get('eventTime', time.time()), 'detection_event', error.get('event'),)
+                    yield (event.get('eventTime', time.time()), 'detection_event', event.get('event'),)
             except queue.Empty:
                 detection_event_empty = True
 
@@ -282,16 +282,25 @@ class MorpheusApiDriver(_Driver):
             with requests.get(url,
                               headers=self.__headers,
                               stream=True,
-                              verify=False) as response:
+                              verify=False,
+                              timeout=(3.05, None)) as response:
 
                 if not response.ok:
                     LOGGER.error(f'Error connecting to event stream: {response.status_code}')
                     return
 
-                try:
-                    event = response.json()
-                    queue.put(event['data']['ietf-restconf:notification'])
-                except json.JSONDecodeError as e:
-                    LOGGER.error(f'Error parsing event: {e}')
+                for line in response.iter_lines(decode_unicode=True):
+                    if not line:
+                        continue
+                    if line.startswith('data:'):
+                        data = line[5:].strip()
+                        LOGGER.error(f'Data received: {data}')
+                        try:
+                            event = json.loads(data)
+                            queue.put(event['ietf-restconf:notification'])
+                        except json.JSONDecodeError as e:
+                            LOGGER.error(f'Error parsing event: {e}')
+                        except KeyError as e:
+                            LOGGER.error(f'Missing expected key in event: {e}')
         except Exception as e:  # pylint: disable=broad-except
             LOGGER.exception('Error in notification stream handler')
-- 
GitLab