Skip to content
Snippets Groups Projects
Commit 9d825e80 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into...

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/287-acls-applied-on-multiple-endpoints-of-a-device-are-not-removed-by-deleting-the-acl
parents 42c4f12e db14c5d3
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!347Resolve "ACLs applied on multiple endpoints of a device are not removed by deleting the ACL"
...@@ -230,6 +230,7 @@ enum DeviceDriverEnum { ...@@ -230,6 +230,7 @@ enum DeviceDriverEnum {
DEVICEDRIVER_IETF_SLICE = 14; DEVICEDRIVER_IETF_SLICE = 14;
DEVICEDRIVER_NCE = 15; DEVICEDRIVER_NCE = 15;
DEVICEDRIVER_SMARTNIC = 16; DEVICEDRIVER_SMARTNIC = 16;
DEVICEDRIVER_MORPHEUS = 17;
} }
enum DeviceOperationalStatusEnum { enum DeviceOperationalStatusEnum {
......
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
COVERAGEFILE=$PROJECTDIR/coverage/.coverage
# Destroy old coverage file and configure the correct folder on the .coveragerc file
rm -f $COVERAGEFILE
cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE
# Run unitary tests and analyze coverage of code at same time
coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
device/tests/test_unitary_morpheus.py
...@@ -51,6 +51,7 @@ class DeviceTypeEnum(Enum): ...@@ -51,6 +51,7 @@ class DeviceTypeEnum(Enum):
SMARTNIC = 'smartnic' SMARTNIC = 'smartnic'
QKD_NODE = 'qkd-node' QKD_NODE = 'qkd-node'
OPEN_ROADM = 'openroadm' OPEN_ROADM = 'openroadm'
MORPHEUS = 'morpheus'
# ETSI TeraFlowSDN controller # ETSI TeraFlowSDN controller
TERAFLOWSDN_CONTROLLER = 'teraflowsdn' TERAFLOWSDN_CONTROLLER = 'teraflowsdn'
......
...@@ -39,6 +39,7 @@ class ORM_DeviceDriverEnum(enum.Enum): ...@@ -39,6 +39,7 @@ class ORM_DeviceDriverEnum(enum.Enum):
OC = DeviceDriverEnum.DEVICEDRIVER_OC OC = DeviceDriverEnum.DEVICEDRIVER_OC
QKD = DeviceDriverEnum.DEVICEDRIVER_QKD QKD = DeviceDriverEnum.DEVICEDRIVER_QKD
SMARTNIC = DeviceDriverEnum.DEVICEDRIVER_SMARTNIC SMARTNIC = DeviceDriverEnum.DEVICEDRIVER_SMARTNIC
MORPHEUS = DeviceDriverEnum.DEVICEDRIVER_MORPHEUS
grpc_to_enum__device_driver = functools.partial( grpc_to_enum__device_driver = functools.partial(
grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
...@@ -20,6 +20,8 @@ build device: ...@@ -20,6 +20,8 @@ build device:
stage: build stage: build
before_script: before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- docker ps -aq | xargs -r docker rm -f
- containerlab destroy --all --cleanup || true
script: script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . - docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
......
...@@ -173,6 +173,16 @@ if LOAD_ALL_DEVICE_DRIVERS: ...@@ -173,6 +173,16 @@ if LOAD_ALL_DEVICE_DRIVERS:
} }
])) ]))
if LOAD_ALL_DEVICE_DRIVERS:
from .morpheus.MorpheusApiDriver import MorpheusApiDriver
DRIVERS.append(
(MorpheusApiDriver, [
{
# Close enough, it does optical switching
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MORPHEUS,
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_MORPHEUS,
}
]))
if LOAD_ALL_DEVICE_DRIVERS: if LOAD_ALL_DEVICE_DRIVERS:
from .microwave.IETFApiDriver import IETFApiDriver # pylint: disable=wrong-import-position from .microwave.IETFApiDriver import IETFApiDriver # pylint: disable=wrong-import-position
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, requests, threading, time
from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from device.service.driver_api._Driver import _Driver
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'morpheus'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class MorpheusApiDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
scheme = self.settings.get('scheme', 'http')
self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120))
self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'}
self.__detection_thread = None
self.__pipeline_error_thread = None
size = self.settings.get('queue_events_size', 10)
self.__pipeline_error_queue = queue.Queue(maxsize=size)
self.__detection_queue = queue.Queue(maxsize=size)
def Connect(self) -> bool:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus'
with self.__lock:
if self.__started.is_set(): return True
try:
requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
self.__started.set()
return True
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root)))
return False
def Disconnect(self) -> bool:
with self.__lock:
try:
if self.__detection_thread and self.__detection_thread.is_alive():
self.__unsubscribe_detection_event()
if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
self.__unsubscribe_pipeline_error()
except Exception: # pylint: disable=broad-except
LOGGER.exception('Error during disconnect')
self.__terminate.set()
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
config = response.json()
result = []
for key, value in config.items():
result.append((key, value))
return result
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root)))
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
config = response.json()
results = []
if not resource_keys:
for key, value in config.items():
results.append((key, value))
return results
for key in resource_keys:
try:
results.append((key, config[key]))
except Exception as e: # pylint: disable=broad-except
results.append((key, e))
return results
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error getting config')
return [(key, e) for key in resource_keys]
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
results = []
with self.__lock:
config = dict(resources)
try:
response = requests.put(url,
headers=self.__headers,
json=config,
timeout=self.__timeout,
verify=False)
results.append(response.ok)
except Exception as e:
results.append(e)
return results
def GetState(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]:
while True:
if self.__terminate.is_set(): break
if terminate is not None and terminate.is_set(): break
internal_state = self.__get_state()
if internal_state is not None:
timestamp = time.time()
yield(timestamp, 'state', internal_state)
pipeline_error_empty = False
detection_event_empty = False
try:
error = self.__pipeline_error_queue.get(block=False, timeout=0.1)
if error is not None:
yield (error.get('eventTime', time.time()), 'pipeline_error', error.get('event'),)
except queue.Empty:
pipeline_error_empty = True
try:
event = self.__detection_queue.get(block=False, timeout=0.1)
if event is not None:
yield (event.get('eventTime', time.time()), 'detection_event', event.get('event'),)
except queue.Empty:
detection_event_empty = True
if pipeline_error_empty and detection_event_empty:
if blocking:
continue
return
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool,Exception]]:
results = []
rollback_stack = []
operations = [
(self.__subscribe_detection_event, self.__unsubscribe_detection_event),
(self.__subscribe_pipeline_error, self.__unsubscribe_pipeline_error),
(self.__start_pipeline, self.__stop_pipeline),
]
for _, (sub_op, unsub_op) in enumerate(operations):
result = sub_op()
results.append(result)
if isinstance(result, Exception):
while rollback_stack:
rollback_op = rollback_stack.pop()
try:
rollback_op()
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error during subscription rollback operation')
return results
rollback_stack.append(unsub_op)
return results
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions: List[Tuple[str,float,float]]) -> List[Union[bool, Exception]]:
results = []
results.append(self.__stop_pipeline())
results.append(self.__unsubscribe_pipeline_error())
results.append(self.__unsubscribe_detection_event())
return results
def __start_pipeline(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start'
with self.__lock:
try:
response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
response.raise_for_status()
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error starting pipeline')
return e
def __stop_pipeline(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop'
with self.__lock:
try:
response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
response.raise_for_status()
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error stopping pipeline')
return e
def __subscribe_detection_event(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event'
with self.__lock:
try:
self.__detection_thread = threading.Thread(
target=self.__handle_notification_stream,
args=(url, self.__detection_queue),
daemon=True
)
self.__detection_thread.start()
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error subscribing to detection events')
return e
def __unsubscribe_detection_event(self) -> Union[bool, Exception]:
try:
if self.__detection_thread and self.__detection_thread.is_alive():
self.__detection_thread.join(timeout=5)
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error unsubscribing from detection events')
return e
def __subscribe_pipeline_error(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error'
with self.__lock:
try:
self.__pipeline_error_thread = threading.Thread(
target=self.__handle_notification_stream,
args=(url, self.__pipeline_error_queue),
daemon=True
)
self.__pipeline_error_thread.start()
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error subscribing to pipeline errors')
return e
def __unsubscribe_pipeline_error(self) -> Union[bool, Exception]:
try:
if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
self.__pipeline_error_thread.join(timeout=5)
return True
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error unsubscribing from pipeline errors')
return e
def __get_state(self) -> Dict:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
state = response.json()
return state
except Exception: # pylint: disable=broad-except
LOGGER.exception('Error getting internal state')
return None
def __handle_notification_stream(self, url: str, queue: queue.Queue[Any]) -> None:
try:
with requests.get(url,
headers=self.__headers,
stream=True,
verify=False,
timeout=(3.05, None)) as response:
if not response.ok:
LOGGER.error(f'Error connecting to event stream: {response.status_code}')
return
for line in response.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith('data:'):
data = line[5:].strip()
LOGGER.error(f'Data received: {data}')
try:
event = json.loads(data)
queue.put(event['ietf-restconf:notification'])
except json.JSONDecodeError as e:
LOGGER.error(f'Error parsing event: {e}')
except KeyError as e:
LOGGER.error(f'Missing expected key in event: {e}')
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Error in notification stream handler')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
# pylint: disable=wrong-import-position
import json, logging, pytest, time, types
from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
##### DRIVER FIXTURE ###################################################################################################
DRIVER_SETTING_ADDRESS = '127.0.0.1'
DRIVER_SETTING_PORT = 8090
@pytest.fixture(scope='session')
def driver() -> MorpheusApiDriver:
_driver = MorpheusApiDriver(
DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT,
)
_driver.Connect()
yield _driver
time.sleep(1)
_driver.Disconnect()
##### TEST METHODS #####################################################################################################
def print_data(label, data):
print(f"{label}: {json.dumps(data, indent=2)}")
def test_initial_config_retrieval(driver: MorpheusApiDriver):
config = driver.GetInitialConfig()
assert isinstance(config, list), "Expected a list for initial config"
assert len(config) > 0, "Initial config should not be empty"
print_data("Initial Config", config)
def test_retrieve_config(driver: MorpheusApiDriver):
config = driver.GetConfig(None)
assert isinstance(config, list), "Expected a list for config"
assert len(config) > 0, "Config should not be empty"
print_data("Config", config)
def test_set_config(driver: MorpheusApiDriver):
results = driver.SetConfig([('traffic_type', 'UDP')])
assert len(results) == 1, "Expected only one result"
assert results[0] is True, "Expected a succesfull result"
def test_retrieve_state(driver: MorpheusApiDriver):
results = driver.SubscribeState([])
assert all(isinstance(result, bool) and result for result in results), \
f"Subscription error: {results}"
state = driver.GetState(blocking=True)
assert isinstance(state, types.GeneratorType), "Expected an iterator for state"
i = 0
for item in state:
if i == 0:
print_data("State Item", item)
i += 1
else:
if item[1] == 'state': continue
print_data("Other Item", item)
i += 1
if i > 3: break
results = driver.UnsubscribeState([])
assert all(isinstance(result, bool) and result for result in results), \
f"Unsubscription error: {results}"
...@@ -35,6 +35,7 @@ class AddDeviceForm(FlaskForm): ...@@ -35,6 +35,7 @@ class AddDeviceForm(FlaskForm):
device_drivers_ietf_actn = BooleanField('IETF ACTN') device_drivers_ietf_actn = BooleanField('IETF ACTN')
device_drivers_qkd = BooleanField('QKD') device_drivers_qkd = BooleanField('QKD')
device_drivers_smartnic = BooleanField('SMARTNIC') device_drivers_smartnic = BooleanField('SMARTNIC')
device_drivers_morpheus = BooleanField('MORPHEUS')
device_config_address = StringField('connect/address',default='127.0.0.1',validators=[DataRequired(), Length(min=5)]) device_config_address = StringField('connect/address',default='127.0.0.1',validators=[DataRequired(), Length(min=5)])
device_config_port = StringField('connect/port',default='0',validators=[DataRequired(), Length(min=1)]) device_config_port = StringField('connect/port',default='0',validators=[DataRequired(), Length(min=1)])
......
...@@ -137,6 +137,8 @@ def add(): ...@@ -137,6 +137,8 @@ def add():
device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_QKD) device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_QKD)
if form.device_drivers_smartnic.data: if form.device_drivers_smartnic.data:
device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_SMARTNIC) device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_SMARTNIC)
if form.device_drivers_morpheus.data:
device_drivers.append(DeviceDriverEnum.DEVICEDRIVER_MORPHEUS)
device_obj.device_drivers.extend(device_drivers) # pylint: disable=no-member device_obj.device_drivers.extend(device_drivers) # pylint: disable=no-member
try: try:
......
...@@ -97,7 +97,8 @@ ...@@ -97,7 +97,8 @@
{{ form.device_drivers_ietf_actn }} {{ form.device_drivers_ietf_actn.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_ietf_actn }} {{ form.device_drivers_ietf_actn.label(class="col-sm-3 col-form-label") }}
{{ form.device_drivers_qkd }} {{ form.device_drivers_qkd.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_qkd }} {{ form.device_drivers_qkd.label(class="col-sm-3 col-form-label") }}
<br /> <br />
{{ form.device_drivers_smartnic }} {{ form.device_drivers_smartnic_actn.label(class="col-sm-3 col-form-label") }} {{ form.device_drivers_smartnic }} {{ form.device_drivers_smartnic.label(class="col-sm-3 col-form-label") }}
{{ form.device_drivers_morpheus }} {{ form.device_drivers_morpheus.label(class="col-sm-3 col-form-label") }}
{% endif %} {% endif %}
</div> </div>
</div> </div>
...@@ -161,4 +162,4 @@ ...@@ -161,4 +162,4 @@
</div> </div>
</fieldset> </fieldset>
</form> </form>
{% endblock %} {% endblock %}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment