From 6cd569776615ef0715c1112b94a2be635c119c70 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Sat, 15 Mar 2025 15:06:00 +0000 Subject: [PATCH] Device component: - Cosmetic changes in L2/L3 VPN drivers - Upgraded Optical TFS Driver --- src/device/service/drivers/__init__.py | 5 +- .../drivers/ietf_l2vpn/IetfL2VpnDriver.py | 3 +- .../drivers/ietf_l2vpn/TfsApiClient.py | 43 +++- .../drivers/ietf_l3vpn/TfsApiClient.py | 189 +++++++++--------- .../drivers/optical_tfs/OpticalTfsDriver.py | 126 ++++++------ .../drivers/optical_tfs/TfsApiClient.py | 128 ++++++++++++ .../drivers/optical_tfs/TfsOpticalClient.py | 100 +++++++++ .../service/drivers/optical_tfs/Tools.py | 143 ------------- .../service/drivers/optical_tfs/__init__.py | 6 - 9 files changed, 429 insertions(+), 314 deletions(-) create mode 100644 src/device/service/drivers/optical_tfs/TfsApiClient.py create mode 100644 src/device/service/drivers/optical_tfs/TfsOpticalClient.py delete mode 100644 src/device/service/drivers/optical_tfs/Tools.py diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index e3102cdf5..70ca1764f 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -188,7 +188,10 @@ if LOAD_ALL_DEVICE_DRIVERS: DRIVERS.append( (OpticalTfsDriver, [ { - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPEN_LINE_SYSTEM, + FilterFieldEnum.DEVICE_TYPE: [ + DeviceTypeEnum.OPEN_LINE_SYSTEM, + DeviceTypeEnum.TERAFLOWSDN_CONTROLLER, + ], FilterFieldEnum.DRIVER: DeviceDriverEnum.DEVICEDRIVER_OPTICAL_TFS, } ])) diff --git a/src/device/service/drivers/ietf_l2vpn/IetfL2VpnDriver.py b/src/device/service/drivers/ietf_l2vpn/IetfL2VpnDriver.py index d756ca89f..ba3cef3d8 100644 --- a/src/device/service/drivers/ietf_l2vpn/IetfL2VpnDriver.py +++ b/src/device/service/drivers/ietf_l2vpn/IetfL2VpnDriver.py @@ -70,9 +70,10 @@ class IetfL2VpnDriver(_Driver): def Connect(self) -> bool: with self.__lock: + if self.__started.is_set(): return True try: self.wim.check_credentials() - except Exception: # pylint: disable=broad-except + except: # pylint: disable=bare-except LOGGER.exception('Exception checking credentials') return False else: diff --git a/src/device/service/drivers/ietf_l2vpn/TfsApiClient.py b/src/device/service/drivers/ietf_l2vpn/TfsApiClient.py index c51e2d6bf..dd6924de0 100644 --- a/src/device/service/drivers/ietf_l2vpn/TfsApiClient.py +++ b/src/device/service/drivers/ietf_l2vpn/TfsApiClient.py @@ -19,6 +19,7 @@ from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum GET_DEVICES_URL = '{:s}://{:s}:{:d}/tfs-api/devices' GET_LINKS_URL = '{:s}://{:s}:{:d}/tfs-api/links' + TIMEOUT = 30 HTTP_OK_CODES = { @@ -47,6 +48,10 @@ MAPPING_DRIVER = { 'DEVICEDRIVER_OPTICAL_TFS' : 9, 'DEVICEDRIVER_IETF_ACTN' : 10, 'DEVICEDRIVER_OC' : 11, + 'DEVICEDRIVER_QKD' : 12, + 'DEVICEDRIVER_IETF_L3VPN' : 13, + 'DEVICEDRIVER_IETF_SLICE' : 14, + 'DEVICEDRIVER_NCE' : 15, } MSG_ERROR = 'Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}' @@ -59,21 +64,31 @@ class TfsApiClient: username : Optional[str] = None, password : Optional[str] = None ) -> None: self._devices_url = GET_DEVICES_URL.format(scheme, address, port) - self._links_url = GET_LINKS_URL.format(scheme, address, port) - self._auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None - - def get_devices_endpoints(self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES) -> List[Dict]: + self._links_url = GET_LINKS_URL.format(scheme, address, port) + self._auth = ( + HTTPBasicAuth(username, password) + if username is not None and password is not None + else None + ) + + def get_devices_endpoints( + self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES + ) -> List[Dict]: LOGGER.debug('[get_devices_endpoints] begin') - LOGGER.debug('[get_devices_endpoints] import_topology={:s}'.format(str(import_topology))) + MSG = '[get_devices_endpoints] import_topology={:s}' + LOGGER.debug(MSG.format(str(import_topology))) reply = requests.get(self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth) if reply.status_code not in HTTP_OK_CODES: - msg = MSG_ERROR.format(str(self._devices_url), str(reply.status_code), str(reply)) + msg = MSG_ERROR.format( + str(self._devices_url), str(reply.status_code), str(reply) + ) LOGGER.error(msg) raise Exception(msg) if import_topology == ImportTopologyEnum.DISABLED: - raise Exception('Unsupported import_topology mode: {:s}'.format(str(import_topology))) + MSG = 'Unsupported import_topology mode: {:s}' + raise Exception(MSG.format(str(import_topology))) result = list() for json_device in reply.json()['devices']: @@ -87,7 +102,10 @@ class TfsApiClient: 'name': json_device['name'], 'type': device_type, 'status': MAPPING_STATUS[device_status], - 'drivers': [MAPPING_DRIVER[driver] for driver in json_device['device_drivers']], + 'drivers': [ + MAPPING_DRIVER[driver] + for driver in json_device['device_drivers'] + ], } result.append((device_url, device_data)) @@ -108,7 +126,9 @@ class TfsApiClient: reply = requests.get(self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth) if reply.status_code not in HTTP_OK_CODES: - msg = MSG_ERROR.format(str(self._links_url), str(reply.status_code), str(reply)) + msg = MSG_ERROR.format( + str(self._links_url), str(reply.status_code), str(reply) + ) LOGGER.error(msg) raise Exception(msg) @@ -116,7 +136,10 @@ class TfsApiClient: link_uuid : str = json_link['link_id']['link_uuid']['uuid'] link_url = '/links/link[{:s}]'.format(link_uuid) link_endpoint_ids = [ - (json_endpoint_id['device_id']['device_uuid']['uuid'], json_endpoint_id['endpoint_uuid']['uuid']) + ( + json_endpoint_id['device_id']['device_uuid']['uuid'], + json_endpoint_id['endpoint_uuid']['uuid'], + ) for json_endpoint_id in json_link['link_endpoint_ids'] ] link_data = { diff --git a/src/device/service/drivers/ietf_l3vpn/TfsApiClient.py b/src/device/service/drivers/ietf_l3vpn/TfsApiClient.py index 1ca965f87..2db898059 100644 --- a/src/device/service/drivers/ietf_l3vpn/TfsApiClient.py +++ b/src/device/service/drivers/ietf_l3vpn/TfsApiClient.py @@ -1,4 +1,4 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,80 +12,75 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from typing import Dict, List, Optional - -import requests +import logging, requests from requests.auth import HTTPBasicAuth - +from typing import Dict, List, Optional from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum -GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices" -GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links" -L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services" +GET_DEVICES_URL = '{:s}://{:s}:{:d}/tfs-api/devices' +GET_LINKS_URL = '{:s}://{:s}:{:d}/tfs-api/links' +L3VPN_URL = '{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services' + TIMEOUT = 30 HTTP_OK_CODES = { - 200, # OK - 201, # Created - 202, # Accepted - 204, # No Content + 200, # OK + 201, # Created + 202, # Accepted + 204, # No Content } MAPPING_STATUS = { - "DEVICEOPERATIONALSTATUS_UNDEFINED": 0, - "DEVICEOPERATIONALSTATUS_DISABLED": 1, - "DEVICEOPERATIONALSTATUS_ENABLED": 2, + 'DEVICEOPERATIONALSTATUS_UNDEFINED': 0, + 'DEVICEOPERATIONALSTATUS_DISABLED' : 1, + 'DEVICEOPERATIONALSTATUS_ENABLED' : 2, } MAPPING_DRIVER = { - "DEVICEDRIVER_UNDEFINED": 0, - "DEVICEDRIVER_OPENCONFIG": 1, - "DEVICEDRIVER_TRANSPORT_API": 2, - "DEVICEDRIVER_P4": 3, - "DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4, - "DEVICEDRIVER_ONF_TR_532": 5, - "DEVICEDRIVER_XR": 6, - "DEVICEDRIVER_IETF_L2VPN": 7, - "DEVICEDRIVER_GNMI_OPENCONFIG": 8, - "DEVICEDRIVER_OPTICAL_TFS": 9, - "DEVICEDRIVER_IETF_ACTN": 10, - "DEVICEDRIVER_OC": 11, + 'DEVICEDRIVER_UNDEFINED' : 0, + 'DEVICEDRIVER_OPENCONFIG' : 1, + 'DEVICEDRIVER_TRANSPORT_API' : 2, + 'DEVICEDRIVER_P4' : 3, + 'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4, + 'DEVICEDRIVER_ONF_TR_532' : 5, + 'DEVICEDRIVER_XR' : 6, + 'DEVICEDRIVER_IETF_L2VPN' : 7, + 'DEVICEDRIVER_GNMI_OPENCONFIG' : 8, + 'DEVICEDRIVER_OPTICAL_TFS' : 9, + 'DEVICEDRIVER_IETF_ACTN' : 10, + 'DEVICEDRIVER_OC' : 11, + 'DEVICEDRIVER_QKD' : 12, + 'DEVICEDRIVER_IETF_L3VPN' : 13, + 'DEVICEDRIVER_IETF_SLICE' : 14, + 'DEVICEDRIVER_NCE' : 15, } -MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}" +MSG_ERROR = 'Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}' LOGGER = logging.getLogger(__name__) - class TfsApiClient: def __init__( - self, - address: str, - port: int, - scheme: str = "http", - username: Optional[str] = None, - password: Optional[str] = None, + self, address : str, port : int, scheme : str = 'http', + username : Optional[str] = None, password : Optional[str] = None ) -> None: self._devices_url = GET_DEVICES_URL.format(scheme, address, port) - self._links_url = GET_LINKS_URL.format(scheme, address, port) - self._l3vpn_url = L3VPN_URL.format(scheme, address, port) - self._auth = None - # ( - # HTTPBasicAuth(username, password) - # if username is not None and password is not None - # else None - # ) + self._links_url = GET_LINKS_URL.format(scheme, address, port) + self._l3vpn_url = L3VPN_URL.format(scheme, address, port) + self._auth = ( + HTTPBasicAuth(username, password) + if username is not None and password is not None + else None + ) def get_devices_endpoints( - self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES + self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES ) -> List[Dict]: - LOGGER.debug("[get_devices_endpoints] begin") - LOGGER.debug( - "[get_devices_endpoints] import_topology={:s}".format(str(import_topology)) - ) + LOGGER.debug('[get_devices_endpoints] begin') + MSG = '[get_devices_endpoints] import_topology={:s}' + LOGGER.debug(MSG.format(str(import_topology))) - reply = requests.get(self._devices_url, timeout=TIMEOUT, auth=self._auth) + reply = requests.get(self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth) if reply.status_code not in HTTP_OK_CODES: msg = MSG_ERROR.format( str(self._devices_url), str(reply.status_code), str(reply) @@ -94,43 +89,44 @@ class TfsApiClient: raise Exception(msg) if import_topology == ImportTopologyEnum.DISABLED: - raise Exception( - "Unsupported import_topology mode: {:s}".format(str(import_topology)) - ) + MSG = 'Unsupported import_topology mode: {:s}' + raise Exception(MSG.format(str(import_topology))) result = list() - for json_device in reply.json()["devices"]: - device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"] - device_type: str = json_device["device_type"] - device_status = json_device["device_operational_status"] - device_url = "/devices/device[{:s}]".format(device_uuid) + for json_device in reply.json()['devices']: + device_uuid : str = json_device['device_id']['device_uuid']['uuid'] + device_type : str = json_device['device_type'] + #if not device_type.startswith('emu-'): device_type = 'emu-' + device_type + device_status = json_device['device_operational_status'] + device_url = '/devices/device[{:s}]'.format(device_uuid) device_data = { - "uuid": json_device["device_id"]["device_uuid"]["uuid"], - "name": json_device["name"], - "type": device_type, - "status": MAPPING_STATUS[device_status], - "drivers": [ - MAPPING_DRIVER[driver] for driver in json_device["device_drivers"] + 'uuid': json_device['device_id']['device_uuid']['uuid'], + 'name': json_device['name'], + 'type': device_type, + 'status': MAPPING_STATUS[device_status], + 'drivers': [ + MAPPING_DRIVER[driver] + for driver in json_device['device_drivers'] ], } result.append((device_url, device_data)) - for json_endpoint in json_device["device_endpoints"]: - endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"] - endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid) + for json_endpoint in json_device['device_endpoints']: + endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid'] + endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid) endpoint_data = { - "device_uuid": device_uuid, - "uuid": endpoint_uuid, - "name": json_endpoint["name"], - "type": json_endpoint["endpoint_type"], + 'device_uuid': device_uuid, + 'uuid': endpoint_uuid, + 'name': json_endpoint['name'], + 'type': json_endpoint['endpoint_type'], } result.append((endpoint_url, endpoint_data)) if import_topology == ImportTopologyEnum.DEVICES: - LOGGER.debug("[get_devices_endpoints] devices only; returning") + LOGGER.debug('[get_devices_endpoints] devices only; returning') return result - reply = requests.get(self._links_url, timeout=TIMEOUT, auth=self._auth) + reply = requests.get(self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth) if reply.status_code not in HTTP_OK_CODES: msg = MSG_ERROR.format( str(self._links_url), str(reply.status_code), str(reply) @@ -138,50 +134,49 @@ class TfsApiClient: LOGGER.error(msg) raise Exception(msg) - for json_link in reply.json()["links"]: - link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"] - link_url = "/links/link[{:s}]".format(link_uuid) + for json_link in reply.json()['links']: + link_uuid : str = json_link['link_id']['link_uuid']['uuid'] + link_url = '/links/link[{:s}]'.format(link_uuid) link_endpoint_ids = [ ( - json_endpoint_id["device_id"]["device_uuid"]["uuid"], - json_endpoint_id["endpoint_uuid"]["uuid"], + json_endpoint_id['device_id']['device_uuid']['uuid'], + json_endpoint_id['endpoint_uuid']['uuid'], ) - for json_endpoint_id in json_link["link_endpoint_ids"] + for json_endpoint_id in json_link['link_endpoint_ids'] ] link_data = { - "uuid": json_link["link_id"]["link_uuid"]["uuid"], - "name": json_link["name"], - "endpoints": link_endpoint_ids, + 'uuid': json_link['link_id']['link_uuid']['uuid'], + 'name': json_link['name'], + 'endpoints': link_endpoint_ids, } result.append((link_url, link_data)) - LOGGER.debug("[get_devices_endpoints] topology; returning") + LOGGER.debug('[get_devices_endpoints] topology; returning') return result def create_connectivity_service(self, l3vpn_data: dict) -> None: try: requests.post(self._l3vpn_url, json=l3vpn_data) - LOGGER.debug( - "[create_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data)) - ) + MSG = '[create_connectivity_service] l3vpn_data={:s}' + LOGGER.debug(MSG.format(str(l3vpn_data))) except requests.exceptions.ConnectionError: - raise Exception("faild to send post request to TFS L3VPN NBI") + raise Exception('Failed to send POST request to TFS L3VPN NBI') def update_connectivity_service(self, l3vpn_data: dict) -> None: - vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']["vpn-services"]["vpn-service"][0]["vpn-id"] - url = self._l3vpn_url + f"/vpn-service={vpn_id}" + vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']['vpn-services']['vpn-service'][0]['vpn-id'] + url = self._l3vpn_url + f'/vpn-service={vpn_id}' try: requests.put(url, json=l3vpn_data) - LOGGER.debug( - "[update_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data)) - ) + MSG = '[update_connectivity_service] l3vpn_data={:s}' + LOGGER.debug(MSG.format(str(l3vpn_data))) except requests.exceptions.ConnectionError: - raise Exception("faild to send post request to TFS L3VPN NBI") + raise Exception('Failed to send PUT request to TFS L3VPN NBI') def delete_connectivity_service(self, service_uuid: str) -> None: - url = self._l3vpn_url + f"/vpn-service={service_uuid}" + url = self._l3vpn_url + f'/vpn-service={service_uuid}' try: - requests.delete(url, auth=self._auth) - LOGGER.debug("[delete_connectivity_service] url={:s}".format(str(url))) + requests.delete(url) + MSG = '[delete_connectivity_service] url={:s}' + LOGGER.debug(MSG.format(str(url))) except requests.exceptions.ConnectionError: - raise Exception("faild to send delete request to TFS L3VPN NBI") + raise Exception('Failed to send DELETE request to TFS L3VPN NBI') diff --git a/src/device/service/drivers/optical_tfs/OpticalTfsDriver.py b/src/device/service/drivers/optical_tfs/OpticalTfsDriver.py index 05c44d2d3..8af800454 100644 --- a/src/device/service/drivers/optical_tfs/OpticalTfsDriver.py +++ b/src/device/service/drivers/optical_tfs/OpticalTfsDriver.py @@ -12,37 +12,43 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json, logging, requests, threading -from requests.auth import HTTPBasicAuth +import json, logging, threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_string, chk_type -from device.service.driver_api._Driver import _Driver -from . import ALL_RESOURCE_KEYS -from .Tools import find_key, add_lightpath, del_lightpath, get_lightpaths -from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS -from device.service.drivers.ietf_l2vpn.TfsApiClient import TfsApiClient +from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_SERVICES from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum, get_import_topology +from .TfsApiClient import TfsApiClient +from .TfsOpticalClient import TfsOpticalClient LOGGER = logging.getLogger(__name__) +ALL_RESOURCE_KEYS = [ + RESOURCE_ENDPOINTS, + RESOURCE_SERVICES, +] + DRIVER_NAME = 'optical_tfs' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) - class OpticalTfsDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() - username = self.settings.get('username') + username = self.settings.get('username') password = self.settings.get('password') - self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None - scheme = self.settings.get('scheme', 'http') - self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password) - self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port)) - self.__timeout = int(self.settings.get('timeout', 120)) + scheme = self.settings.get('scheme', 'http') + timeout = int(self.settings.get('timeout', 60)) + self.tac = TfsApiClient( + self.address, int(self.port), scheme=scheme, username=username, + password=password, timeout=timeout + ) + self.toc = TfsOpticalClient( + self.address, int(self.port), scheme=scheme, username=username, + password=password, timeout=timeout + ) # Options are: # disabled --> just import endpoints as usual @@ -51,19 +57,14 @@ class OpticalTfsDriver(_Driver): # topology --> imports sub-devices and links connecting them. # (not supported by XR driver) self.__import_topology = get_import_topology(self.settings, default=ImportTopologyEnum.TOPOLOGY) - def Connect(self) -> bool: - url = self.__base_url + '/OpticalTFS/GetLightpaths' with self.__lock: if self.__started.is_set(): return True try: - requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth) - except requests.exceptions.Timeout: - LOGGER.exception('Timeout connecting {:s}'.format(str(self.__tapi_root))) - return False - except Exception: # pylint: disable=broad-except - LOGGER.exception('Exception connecting {:s}'.format(str(self.__tapi_root))) + self.toc.check_credentials() + except: # pylint: disable=bare-except + LOGGER.exception('Exception checking credentials') return False else: self.__started.set() @@ -84,68 +85,81 @@ class OpticalTfsDriver(_Driver): chk_type('resources', resource_keys, list) results = [] with self.__lock: + self.toc.check_credentials() if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) - chk_string(str_resource_name, resource_key, allow_empty=False) - - if resource_key == RESOURCE_ENDPOINTS: - # return endpoints through TFS NBI API and list-devices method - results.extend(self.tac.get_devices_endpoints(self.__import_topology)) - - # results.extend(get_lightpaths( - # self.__base_url, resource_key, timeout=self.__timeout, auth=self.__auth)) + try: + chk_string(str_resource_name, resource_key, allow_empty=False) + if resource_key == RESOURCE_ENDPOINTS: + # return endpoints through TFS NBI API and list-devices method + results.extend(self.tac.get_devices_endpoints(self.__import_topology)) + elif resource_key == RESOURCE_SERVICES: + # return all services through + results.extend(self.toc.get_lightpaths()) + else: + MSG = 'ResourceKey({:s}) not implemented' + LOGGER.warning(MSG.format(str(resource_key))) + except Exception as e: + LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key))) + results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: results = [] - if len(resources) == 0: - return results + if len(resources) == 0: return results with self.__lock: - for _, resource in resources: + self.toc.check_credentials() + for resource in resources: LOGGER.info('resource = {:s}'.format(str(resource))) - - src_node = find_key(resource, 'src_node') - dst_node = find_key(resource, 'dst_node') - bitrate = find_key(resource, 'bitrate') - - response = add_lightpath(self.__base_url, src_node, dst_node, bitrate, - auth=self.__auth, timeout=self.__timeout) - - results.extend(response) + resource_key,resource_value = resource + try: + resource_value = json.loads(resource_value) + src_node = resource_value['src_node'] + dst_node = resource_value['dst_node'] + bitrate = resource_value['bitrate' ] + results.extend(self.toc.add_lightpath(src_node, dst_node, bitrate)) + results.append((resource_key, True)) + except Exception as e: + LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key))) + results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: results = [] - if len(resources) == 0: - return results + if len(resources) == 0: return results with self.__lock: - for _, resource in resources: + self.toc.check_credentials() + for resource in resources: LOGGER.info('resource = {:s}'.format(str(resource))) - flow_id = find_key(resource, 'flow_id') - src_node = find_key(resource, 'src_node') - dst_node = find_key(resource, 'dst_node') - bitrate = find_key(resource, 'bitrate') - - response = del_lightpath(self.__base_url, flow_id, src_node, dst_node, bitrate) - results.extend(response) - + resource_key,resource_value = resource + try: + resource_value = json.loads(resource_value) + flow_id = resource_value['flow_id' ] + src_node = resource_value['src_node'] + dst_node = resource_value['dst_node'] + bitrate = resource_value['bitrate' ] + self.toc.del_lightpath(flow_id, src_node, dst_node, bitrate) + results.append((resource_key, True)) + except Exception as e: + LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key))) + results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - # Optical TFS does not support monitoring by now + # TODO: Optical TFS does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - # Optical TFS does not support monitoring by now + # TODO: Optical TFS does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate : Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: - # Optical TFS does not support monitoring by now + # TODO: Optical TFS does not support monitoring by now return [] diff --git a/src/device/service/drivers/optical_tfs/TfsApiClient.py b/src/device/service/drivers/optical_tfs/TfsApiClient.py new file mode 100644 index 000000000..8df8e5261 --- /dev/null +++ b/src/device/service/drivers/optical_tfs/TfsApiClient.py @@ -0,0 +1,128 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, requests +from typing import Dict, List, Optional +from common.tools.client.RestClient import RestClient +from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum + +GET_DEVICES_URL = '/tfs-api/devices' +GET_LINKS_URL = '/tfs-api/links' + +MAPPING_STATUS = { + 'DEVICEOPERATIONALSTATUS_UNDEFINED': 0, + 'DEVICEOPERATIONALSTATUS_DISABLED' : 1, + 'DEVICEOPERATIONALSTATUS_ENABLED' : 2, +} + +MAPPING_DRIVER = { + 'DEVICEDRIVER_UNDEFINED' : 0, + 'DEVICEDRIVER_OPENCONFIG' : 1, + 'DEVICEDRIVER_TRANSPORT_API' : 2, + 'DEVICEDRIVER_P4' : 3, + 'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4, + 'DEVICEDRIVER_ONF_TR_532' : 5, + 'DEVICEDRIVER_XR' : 6, + 'DEVICEDRIVER_IETF_L2VPN' : 7, + 'DEVICEDRIVER_GNMI_OPENCONFIG' : 8, + 'DEVICEDRIVER_OPTICAL_TFS' : 9, + 'DEVICEDRIVER_IETF_ACTN' : 10, + 'DEVICEDRIVER_OC' : 11, + 'DEVICEDRIVER_QKD' : 12, + 'DEVICEDRIVER_IETF_L3VPN' : 13, + 'DEVICEDRIVER_IETF_SLICE' : 14, + 'DEVICEDRIVER_NCE' : 15, +} + +LOGGER = logging.getLogger(__name__) + +class TfsApiClient(RestClient): + def __init__( + self, address : str, port : int, scheme : str = 'http', + username : Optional[str] = None, password : Optional[str] = None, + timeout : Optional[int] = 30 + ) -> None: + super().__init__( + address, port, scheme=scheme, username=username, password=password, + timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER + ) + + def get_devices_endpoints( + self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES + ) -> List[Dict]: + LOGGER.debug('[get_devices_endpoints] begin') + MSG = '[get_devices_endpoints] import_topology={:s}' + LOGGER.debug(MSG.format(str(import_topology))) + + if import_topology == ImportTopologyEnum.DISABLED: + MSG = 'Unsupported import_topology mode: {:s}' + raise Exception(MSG.format(str(import_topology))) + + devices = self.get(GET_DEVICES_URL, expected_status_codes={requests.codes['OK']}) + + result = list() + for json_device in devices['devices']: + device_uuid : str = json_device['device_id']['device_uuid']['uuid'] + device_type : str = json_device['device_type'] + #if not device_type.startswith('emu-'): device_type = 'emu-' + device_type + device_status = json_device['device_operational_status'] + device_url = '/devices/device[{:s}]'.format(device_uuid) + device_data = { + 'uuid': json_device['device_id']['device_uuid']['uuid'], + 'name': json_device['name'], + 'type': device_type, + 'status': MAPPING_STATUS[device_status], + 'drivers': [ + MAPPING_DRIVER[driver] + for driver in json_device['device_drivers'] + ], + } + result.append((device_url, device_data)) + + for json_endpoint in json_device['device_endpoints']: + endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid'] + endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid) + endpoint_data = { + 'device_uuid': device_uuid, + 'uuid': endpoint_uuid, + 'name': json_endpoint['name'], + 'type': json_endpoint['endpoint_type'], + } + result.append((endpoint_url, endpoint_data)) + + if import_topology == ImportTopologyEnum.DEVICES: + LOGGER.debug('[get_devices_endpoints] devices only; returning') + return result + + links = self.get(GET_LINKS_URL, expected_status_codes={requests.codes['OK']}) + + for json_link in links['links']: + link_uuid : str = json_link['link_id']['link_uuid']['uuid'] + link_url = '/links/link[{:s}]'.format(link_uuid) + link_endpoint_ids = [ + ( + json_endpoint_id['device_id']['device_uuid']['uuid'], + json_endpoint_id['endpoint_uuid']['uuid'], + ) + for json_endpoint_id in json_link['link_endpoint_ids'] + ] + link_data = { + 'uuid': json_link['link_id']['link_uuid']['uuid'], + 'name': json_link['name'], + 'endpoints': link_endpoint_ids, + } + result.append((link_url, link_data)) + + LOGGER.debug('[get_devices_endpoints] topology; returning') + return result diff --git a/src/device/service/drivers/optical_tfs/TfsOpticalClient.py b/src/device/service/drivers/optical_tfs/TfsOpticalClient.py new file mode 100644 index 000000000..f5749ae56 --- /dev/null +++ b/src/device/service/drivers/optical_tfs/TfsOpticalClient.py @@ -0,0 +1,100 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging, requests +from typing import Dict, List, Optional, Union +from common.tools.client.RestClient import RestClient + + +LOGGER = logging.getLogger(__name__) + +GET_OPTICAL_LINKS_URL = '/OpticalTFS/GetLinks' +GET_LIGHTPATHS_URL = '/OpticalTFS/GetLightpaths' +ADD_LIGHTPATH_URL = '/OpticalTFS/AddLightpath/{src_node:s}/{dst_node:s}/{bitrate:s}' +DEL_LIGHTPATH_URL = '/OpticalTFS/DelLightpath/{flow_id:s}/{src_node:s}/{dst_node:s}/{bitrate:s}' + + +class TfsOpticalClient(RestClient): + def __init__( + self, address : str, port : int, scheme : str = 'http', + username : Optional[str] = None, password : Optional[str] = None, + timeout : Optional[int] = 30 + ) -> None: + super().__init__( + address, port, scheme=scheme, username=username, password=password, + timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER + ) + + def check_credentials(self) -> None: + self.get(GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']}) + LOGGER.info('Credentials checked') + + def get_optical_links(self) -> Union[List[Dict], Exception]: + try: + return self.get(GET_OPTICAL_LINKS_URL, expected_status_codes={requests.codes['OK']}) + except Exception as e: + LOGGER.exception('Exception retrieving optical links') + return e + + def get_lightpaths(self) -> Union[List[Dict], Exception]: + try: + lightpaths : List[Dict] = self.get( + GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']} + ) + except Exception as e: + LOGGER.exception('Exception retrieving lightpaths') + return e + + result = [] + for lightpath in lightpaths: + assert 'flow_id' in lightpath + assert 'src' in lightpath + assert 'dst' in lightpath + assert 'bitrate' in lightpath + resource_key = '/lightpaths/lightpath[{:s}]'.format(lightpath['flow_id']) + result.append((resource_key, lightpath)) + return result + + def add_lightpath( + self, src_node : str, dst_node : str, bitrate : int + ) -> Union[List[Dict], Exception]: + MSG = 'Add Lightpath: {:s} <-> {:s} with {:d} bitrate' + LOGGER.info(MSG.format(str(src_node), str(dst_node), int(bitrate))) + request_endpoint = ADD_LIGHTPATH_URL.format( + src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate) + ) + expected_status_codes = {requests.codes['CREATED'], requests.codes['NO_CONTENT']} + try: + return self.put(request_endpoint, expected_status_codes=expected_status_codes) + except Exception as e: + MSG = 'Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate' + LOGGER.exception(MSG.format(str(src_node), str(dst_node), str(bitrate))) + return e + + def del_lightpath( + self, flow_id : str, src_node : str, dst_node : str, bitrate : int + ) -> Union[List[Dict], Exception]: + MSG = 'Delete Lightpath {:s}: {:s} <-> {:s} with {:d} bitrate' + LOGGER.info(MSG.format(str(flow_id), str(src_node), str(dst_node), int(bitrate))) + request_endpoint = DEL_LIGHTPATH_URL.format( + src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate) + ) + expected_status_codes = {requests.codes['NO_CONTENT']} + try: + return self.delete(request_endpoint, expected_status_codes=expected_status_codes) + except Exception as e: + MSG = 'Exception deleting Lightpath {:s}: {:s} <-> {:s} with {:s} bitrate' + LOGGER.exception(MSG.format(str(flow_id), str(src_node), str(dst_node), str(bitrate))) + return e diff --git a/src/device/service/drivers/optical_tfs/Tools.py b/src/device/service/drivers/optical_tfs/Tools.py deleted file mode 100644 index 3714672f8..000000000 --- a/src/device/service/drivers/optical_tfs/Tools.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json, logging, requests -from requests.auth import HTTPBasicAuth -from typing import Optional - -LOGGER = logging.getLogger(__name__) - -HTTP_OK_CODES = { - 200, # OK - 201, # Created - 202, # Accepted - 204, # No Content -} - -def find_key(resource, key): - return json.loads(resource[1])[key] - -def get_lightpaths(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None, - timeout : Optional[int] = None): - headers = {'accept': 'application/json'} - url = '{:s}/OpticalTFS/GetLightpaths'.format(root_url) - - result = [] - try: - response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth) - except requests.exceptions.Timeout: - LOGGER.exception('Timeout connecting {:s}'.format(url)) - return result - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception retrieving {:s}'.format(resource_key)) - result.append((resource_key, e)) - return result - - try: - flows = json.loads(response.content) - except Exception as e: # pylint: disable=broad-except - LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content))) - result.append((resource_key, e)) - return result - - for flow in flows: - flow_id = flow.get('flow_id') - source = flow.get('src') - destination = flow.get('dst') - bitrate = flow.get('bitrate') - - endpoint_url = '/flows/flow[{:s}]'.format(flow_id) - endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate} - result.append((endpoint_url, endpoint_data)) - - return result - - -def add_lightpath(root_url, src_node, dst_node, bitrate, - auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None): - - headers = {'accept': 'application/json'} - url = '{:s}/OpticalTFS/AddLightpath/{:s}/{:s}/{:s}'.format( - root_url, src_node, dst_node, bitrate) - - results = [] - try: - LOGGER.info('Lightpath request: {:s} <-> {:s} with {:s} bitrate'.format( - str(src_node), str(dst_node), str(bitrate))) - response = requests.put(url=url, timeout=timeout, headers=headers, verify=False, auth=auth) - results.append(response.json()) - LOGGER.info('Response: {:s}'.format(str(response))) - - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'.format( - str(src_node), str(dst_node), str(bitrate))) - results.append(e) - else: - if response.status_code not in HTTP_OK_CODES: - msg = 'Could not create Lightpath(status_code={:s} reply={:s}' - LOGGER.error(msg.format(str(response.status_code), str(response))) - results.append(response.status_code in HTTP_OK_CODES) - - return results - - - -def del_lightpath(root_url, flow_id, src_node, dst_node, bitrate, - auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None): - url = '{:s}/OpticalTFS/DelLightpath/{:s}/{:s}/{:s}/{:s}'.format( - root_url, flow_id, src_node, dst_node, bitrate) - headers = {'accept': 'application/json'} - - results = [] - - try: - response = requests.delete( - url=url, timeout=timeout, headers=headers, verify=False, auth=auth) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception deleting Lightpath(uuid={:s})'.format(str(flow_id))) - results.append(e) - else: - if response.status_code not in HTTP_OK_CODES: - msg = 'Could not delete Lightpath(flow_id={:s}). status_code={:s} reply={:s}' - LOGGER.error(msg.format(str(flow_id), str(response.status_code), str(response))) - results.append(response.status_code in HTTP_OK_CODES) - - return results - - -def get_topology(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None, - timeout : Optional[int] = None): - headers = {'accept': 'application/json'} - url = '{:s}/OpticalTFS/GetLinks'.format(root_url) - - result = [] - try: - response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth) - except requests.exceptions.Timeout: - LOGGER.exception('Timeout connecting {:s}'.format(url)) - return result - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception retrieving {:s}'.format(resource_key)) - result.append((resource_key, e)) - return result - - try: - response = json.loads(response.content) - except Exception as e: # pylint: disable=broad-except - LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content))) - result.append((resource_key, e)) - return result - - result.append(response) - return result diff --git a/src/device/service/drivers/optical_tfs/__init__.py b/src/device/service/drivers/optical_tfs/__init__.py index 97ec0dd42..53d5157f7 100644 --- a/src/device/service/drivers/optical_tfs/__init__.py +++ b/src/device/service/drivers/optical_tfs/__init__.py @@ -12,9 +12,3 @@ # See the License for the specific language governing permissions and # limitations under the License. -from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_SERVICES - -ALL_RESOURCE_KEYS = [ - RESOURCE_ENDPOINTS, - RESOURCE_SERVICES, -] -- GitLab