diff --git a/proto/context.proto b/proto/context.proto index 52599784736550dab339db88e68bd43c5fdcc8da..3d2789ea74f78b49b95ed8130b9fe406e762eed2 100644 --- a/proto/context.proto +++ b/proto/context.proto @@ -230,6 +230,7 @@ enum DeviceDriverEnum { DEVICEDRIVER_IETF_SLICE = 14; DEVICEDRIVER_NCE = 15; DEVICEDRIVER_SMARTNIC = 16; + DEVICEDRIVER_MORPHEUS = 17; } enum DeviceOperationalStatusEnum { diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh new file mode 100755 index 0000000000000000000000000000000000000000..f9666e4249b9e57bc3750bb8fd5b2a98ba4f251f --- /dev/null +++ b/scripts/run_tests_locally-device-morpheus.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +COVERAGEFILE=$PROJECTDIR/coverage/.coverage + +# Destroy old coverage file and configure the correct folder on the .coveragerc file +rm -f $COVERAGEFILE +cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE + +# Run unitary tests and analyze coverage of code at same time +coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ + device/tests/test_unitary_morpheus.py diff --git a/src/common/DeviceTypes.py b/src/common/DeviceTypes.py index 9b269999322f02b7c3f16fc1a7df00fa8272e2af..2b8f7e3dc8963b8f9137a7e811ea608dfc8ff0ab 100644 --- a/src/common/DeviceTypes.py +++ b/src/common/DeviceTypes.py @@ -51,6 +51,7 @@ class DeviceTypeEnum(Enum): SMARTNIC = 'smartnic' QKD_NODE = 'qkd-node' OPEN_ROADM = 'openroadm' + MORPHEUS = 'morpheus' # ETSI TeraFlowSDN controller TERAFLOWSDN_CONTROLLER = 'teraflowsdn' diff --git a/src/context/service/database/models/enums/DeviceDriver.py b/src/context/service/database/models/enums/DeviceDriver.py index f6e128355c557624d6a66d6b147beda5777f74fa..380495f7083e5366a9367d0ceeb427b54cf9e0af 100644 --- a/src/context/service/database/models/enums/DeviceDriver.py +++ b/src/context/service/database/models/enums/DeviceDriver.py @@ -39,6 +39,7 @@ class ORM_DeviceDriverEnum(enum.Enum): OC = DeviceDriverEnum.DEVICEDRIVER_OC QKD = DeviceDriverEnum.DEVICEDRIVER_QKD SMARTNIC = DeviceDriverEnum.DEVICEDRIVER_SMARTNIC + MORPHEUS = DeviceDriverEnum.DEVICEDRIVER_MORPHEUS grpc_to_enum__device_driver = functools.partial( grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) diff --git a/src/device/.gitlab-ci.yml b/src/device/.gitlab-ci.yml index 0c093b57005efbb604e1c316322942b2cdcb1c68..c216b14bfb50dbcfa8c8b455d24878234019ac33 100644 --- a/src/device/.gitlab-ci.yml +++ b/src/device/.gitlab-ci.yml @@ -20,6 +20,8 @@ build device: stage: build before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - docker ps -aq | xargs -r docker rm -f + - containerlab destroy --all --cleanup || true script: - docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index 727f610cf1f5027cf630e3dcc6b5de0a82721b75..fec407df96c0ae8650baf8492a8795c732c5c104 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -173,6 +173,16 @@ if LOAD_ALL_DEVICE_DRIVERS: } ])) +if LOAD_ALL_DEVICE_DRIVERS: + from .morpheus.MorpheusApiDriver import MorpheusApiDriver + DRIVERS.append( + (MorpheusApiDriver, [ + { + # Close enough, it does optical switching + FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS, + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS, + } + ])) if LOAD_ALL_DEVICE_DRIVERS: from .microwave.IETFApiDriver import IETFApiDriver # pylint: disable=wrong-import-position diff --git a/src/device/service/drivers/morpheus/MorpheusApiDriver.py b/src/device/service/drivers/morpheus/MorpheusApiDriver.py new file mode 100644 index 0000000000000000000000000000000000000000..1864e810e1783b8a7232bceae4f2a08ac28d26a5 --- /dev/null +++ b/src/device/service/drivers/morpheus/MorpheusApiDriver.py @@ -0,0 +1,306 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging, queue, requests, threading, time +from typing import Any, Iterator, List, Optional, Tuple, Union, Dict +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from device.service.driver_api._Driver import _Driver + +LOGGER = logging.getLogger(__name__) + +DRIVER_NAME = 'morpheus' +METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) + +class MorpheusApiDriver(_Driver): + def __init__(self, address: str, port: int, **settings) -> None: + super().__init__(DRIVER_NAME, address, port, **settings) + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + scheme = self.settings.get('scheme', 'http') + self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) + self.__timeout = int(self.settings.get('timeout', 120)) + self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'} + + self.__detection_thread = None + self.__pipeline_error_thread = None + + size = self.settings.get('queue_events_size', 10) + self.__pipeline_error_queue = queue.Queue(maxsize=size) + self.__detection_queue = queue.Queue(maxsize=size) + + def Connect(self) -> bool: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus' + with self.__lock: + if self.__started.is_set(): return True + try: + requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + self.__started.set() + return True + except requests.exceptions.Timeout: + LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root))) + return False + except Exception: # pylint: disable=broad-except + LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root))) + return False + + def Disconnect(self) -> bool: + with self.__lock: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.__unsubscribe_detection_event() + + if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): + self.__unsubscribe_pipeline_error() + except Exception: # pylint: disable=broad-except + LOGGER.exception('Error during disconnect') + + self.__terminate.set() + return True + + @metered_subclass_method(METRICS_POOL) + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + result = [] + + for key, value in config.items(): + result.append((key, value)) + + return result + except Exception: # pylint: disable=broad-except + LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root))) + return [] + + @metered_subclass_method(METRICS_POOL) + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + config = response.json() + results = [] + + if not resource_keys: + for key, value in config.items(): + results.append((key, value)) + return results + + for key in resource_keys: + try: + results.append((key, config[key])) + except Exception as e: # pylint: disable=broad-except + results.append((key, e)) + return results + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error getting config') + return [(key, e) for key in resource_keys] + + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config' + results = [] + with self.__lock: + config = dict(resources) + try: + response = requests.put(url, + headers=self.__headers, + json=config, + timeout=self.__timeout, + verify=False) + results.append(response.ok) + except Exception as e: + results.append(e) + return results + + def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + + internal_state = self.__get_state() + if internal_state is not None: + timestamp = time.time() + yield(timestamp, 'state', internal_state) + + pipeline_error_empty = False + detection_event_empty = False + + try: + error = self.__pipeline_error_queue.get(block=False, timeout=0.1) + if error is not None: + yield (error.get('eventTime', time.time()), 'pipeline_error', error.get('event'),) + except queue.Empty: + pipeline_error_empty = True + + try: + event = self.__detection_queue.get(block=False, timeout=0.1) + if event is not None: + yield (event.get('eventTime', time.time()), 'detection_event', event.get('event'),) + except queue.Empty: + detection_event_empty = True + + if pipeline_error_empty and detection_event_empty: + if blocking: + continue + return + + @metered_subclass_method(METRICS_POOL) + def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool,Exception]]: + results = [] + rollback_stack = [] + operations = [ + (self.__subscribe_detection_event, self.__unsubscribe_detection_event), + (self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error), + (self.__start_pipeline, self.__stop_pipeline), + ] + for _, (sub_op, unsub_op) in enumerate(operations): + result = sub_op() + results.append(result) + if isinstance(result, Exception): + while rollback_stack: + rollback_op = rollback_stack.pop() + try: + rollback_op() + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error during subscription rollback operation') + + return results + + rollback_stack.append(unsub_op) + return results + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeState(self, subscriptions: List[Tuple[str,float,float]]) -> List[Union[bool, Exception]]: + results = [] + results.append(self.__stop_pipeline()) + results.append(self.__unsubscribe_pipeline_error()) + results.append(self.__unsubscribe_detection_event()) + return results + + def __start_pipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error starting pipeline') + return e + + def __stop_pipeline(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop' + with self.__lock: + try: + response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False) + response.raise_for_status() + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error stopping pipeline') + return e + + def __subscribe_detection_event(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event' + with self.__lock: + try: + self.__detection_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__detection_queue), + daemon=True + ) + self.__detection_thread.start() + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error subscribing to detection events') + return e + + def __unsubscribe_detection_event(self) -> Union[bool, Exception]: + try: + if self.__detection_thread and self.__detection_thread.is_alive(): + self.__detection_thread.join(timeout=5) + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error unsubscribing from detection events') + return e + + def __subscribe_pipeline_error(self) -> Union[bool, Exception]: + url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error' + with self.__lock: + try: + self.__pipeline_error_thread = threading.Thread( + target=self.__handle_notification_stream, + args=(url, self.__pipeline_error_queue), + daemon=True + ) + self.__pipeline_error_thread.start() + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error subscribing to pipeline errors') + return e + + def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]: + try: + if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive(): + self.__pipeline_error_thread.join(timeout=5) + return True + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error unsubscribing from pipeline errors') + return e + + def __get_state(self) -> Dict: + url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state' + with self.__lock: + try: + response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False) + if response.ok: + state = response.json() + return state + except Exception: # pylint: disable=broad-except + LOGGER.exception('Error getting internal state') + return None + + + def __handle_notification_stream(self, url: str, queue: queue.Queue[Any]) -> None: + try: + with requests.get(url, + headers=self.__headers, + stream=True, + verify=False, + timeout=(3.05, None)) as response: + + if not response.ok: + LOGGER.error(f'Error connecting to event stream: {response.status_code}') + return + + for line in response.iter_lines(decode_unicode=True): + if not line: + continue + if line.startswith('data:'): + data = line[5:].strip() + LOGGER.error(f'Data received: {data}') + try: + event = json.loads(data) + queue.put(event['ietf-restconf:notification']) + except json.JSONDecodeError as e: + LOGGER.error(f'Error parsing event: {e}') + except KeyError as e: + LOGGER.error(f'Missing expected key in event: {e}') + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Error in notification stream handler') diff --git a/src/device/service/drivers/morpheus/__init__.py b/src/device/service/drivers/morpheus/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..023830645e0fcb60e3f8583674a954810af222f2 --- /dev/null +++ b/src/device/service/drivers/morpheus/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py new file mode 100644 index 0000000000000000000000000000000000000000..27de889e509740097433c5af8b99af23b75118aa --- /dev/null +++ b/src/device/tests/test_unitary_morpheus.py @@ -0,0 +1,93 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +os.environ['DEVICE_EMULATED_ONLY'] = 'YES' + +# pylint: disable=wrong-import-position +import json, logging, pytest, time, types +from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver + +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +##### DRIVER FIXTURE ################################################################################################### + +DRIVER_SETTING_ADDRESS = '127.0.0.1' +DRIVER_SETTING_PORT = 8090 + +@pytest.fixture(scope='session') +def driver() -> MorpheusApiDriver: + _driver = MorpheusApiDriver( + DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT, + ) + _driver.Connect() + yield _driver + time.sleep(1) + _driver.Disconnect() + + +##### TEST METHODS ##################################################################################################### + +def print_data(label, data): + print(f"{label}: {json.dumps(data, indent=2)}") + +def test_initial_config_retrieval(driver: MorpheusApiDriver): + config = driver.GetInitialConfig() + + assert isinstance(config, list), "Expected a list for initial config" + assert len(config) > 0, "Initial config should not be empty" + + print_data("Initial Config", config) + +def test_retrieve_config(driver: MorpheusApiDriver): + config = driver.GetConfig(None) + + assert isinstance(config, list), "Expected a list for config" + assert len(config) > 0, "Config should not be empty" + + print_data("Config", config) + +def test_set_config(driver: MorpheusApiDriver): + results = driver.SetConfig([('traffic_type', 'UDP')]) + + assert len(results) == 1, "Expected only one result" + assert results[0] is True, "Expected a succesfull result" + +def test_retrieve_state(driver: MorpheusApiDriver): + results = driver.SubscribeState([]) + + assert all(isinstance(result, bool) and result for result in results), \ + f"Subscription error: {results}" + + state = driver.GetState(blocking=True) + + assert isinstance(state, types.GeneratorType), "Expected an iterator for state" + i = 0 + for item in state: + if i == 0: + print_data("State Item", item) + i += 1 + else: + if item[1] == 'state': continue + print_data("Other Item", item) + i += 1 + if i > 3: break + + results = driver.UnsubscribeState([]) + + assert all(isinstance(result, bool) and result for result in results), \ + f"Unsubscription error: {results}" diff --git a/src/webui/service/device/forms.py b/src/webui/service/device/forms.py index 90259295a1a1da429096982fb9f8c54509fc4c92..44af774beee68d28d37921b3024aa4a1e4d9e014 100644 --- a/src/webui/service/device/forms.py +++ b/src/webui/service/device/forms.py @@ -35,6 +35,7 @@ class AddDeviceForm(FlaskForm): device_drivers_ietf_actn = BooleanField('IETF ACTN') device_drivers_qkd = BooleanField('QKD') device_drivers_smartnic = BooleanField('SMARTNIC') + device_drivers_morpheus = BooleanField('MORPHEUS') device_config_address = StringField('connect/address',default='127.0.0.1',validators=[DataRequired(), Length(min=5)]) device_config_port = StringField('connect/port',default='0',validators=[DataRequired(), Length(min=1)]) diff --git a/src/webui/service/device/routes.py b/src/webui/service/device/routes.py index 177e36c2ef877728201447656c749b9fca29dabd..ef3b6466c2567e702e847565858c2e8e1c9467ae 100644 --- a/src/webui/service/device/routes.py +++ b/src/webui/service/device/routes.py @@ -137,6 +137,8 @@ def add(): device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_QKD) if form.device_drivers_smartnic.data: device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_SMARTNIC) + if form.device_drivers_morpheus.data: + device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_MORPHEUS) device_obj.device_drivers.extend(device_drivers) # pylint: disable=no-member try: diff --git a/src/webui/service/templates/device/add.html b/src/webui/service/templates/device/add.html index 15abc0894d923d7507e51d26e8b1fb6cdb7b757d..b5f48e1bb11167a235282387270b3e98df34e9ee 100644 --- a/src/webui/service/templates/device/add.html +++ b/src/webui/service/templates/device/add.html @@ -97,7 +97,8 @@ {{ form.device_drivers_ietf_actn }} {{ form.device_drivers_ietf_actn.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_qkd }} {{ form.device_drivers_qkd.label(class="col-sm-3 col-form-label") }} <br /> - {{ form.device_drivers_smartnic }} {{ form.device_drivers_smartnic_actn.label(class="col-sm-3 col-form-label") }} + {{ form.device_drivers_smartnic }} {{ form.device_drivers_smartnic.label(class="col-sm-3 col-form-label") }} + {{ form.device_drivers_morpheus }} {{ form.device_drivers_morpheus.label(class="col-sm-3 col-form-label") }} {% endif %} </div> </div> @@ -161,4 +162,4 @@ </div> </fieldset> </form> -{% endblock %} \ No newline at end of file +{% endblock %}