Skip to content
Snippets Groups Projects
Commit 6cd56977 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component:

- Cosmetic changes in L2/L3 VPN drivers
- Upgraded Optical TFS Driver
parent 32e0c18c
No related branches found
No related tags found
3 merge requests!359Release TeraFlowSDN 5.0,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
......@@ -188,7 +188,10 @@ if LOAD_ALL_DEVICE_DRIVERS:
DRIVERS.append(
(OpticalTfsDriver, [
{
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPEN_LINE_SYSTEM,
FilterFieldEnum.DEVICE_TYPE: [
DeviceTypeEnum.OPEN_LINE_SYSTEM,
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER,
],
FilterFieldEnum.DRIVER: DeviceDriverEnum.DEVICEDRIVER_OPTICAL_TFS,
}
]))
......
......@@ -70,9 +70,10 @@ class IetfL2VpnDriver(_Driver):
def Connect(self) -> bool:
with self.__lock:
if self.__started.is_set(): return True
try:
self.wim.check_credentials()
except Exception: # pylint: disable=broad-except
except: # pylint: disable=bare-except
LOGGER.exception('Exception checking credentials')
return False
else:
......
......@@ -19,6 +19,7 @@ from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = '{:s}://{:s}:{:d}/tfs-api/devices'
GET_LINKS_URL = '{:s}://{:s}:{:d}/tfs-api/links'
TIMEOUT = 30
HTTP_OK_CODES = {
......@@ -47,6 +48,10 @@ MAPPING_DRIVER = {
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
MSG_ERROR = 'Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}'
......@@ -59,21 +64,31 @@ class TfsApiClient:
username : Optional[str] = None, password : Optional[str] = None
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
def get_devices_endpoints(self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES) -> List[Dict]:
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._auth = (
HTTPBasicAuth(username, password)
if username is not None and password is not None
else None
)
def get_devices_endpoints(
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug('[get_devices_endpoints] begin')
LOGGER.debug('[get_devices_endpoints] import_topology={:s}'.format(str(import_topology)))
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
reply = requests.get(self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(str(self._devices_url), str(reply.status_code), str(reply))
msg = MSG_ERROR.format(
str(self._devices_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception('Unsupported import_topology mode: {:s}'.format(str(import_topology)))
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
result = list()
for json_device in reply.json()['devices']:
......@@ -87,7 +102,10 @@ class TfsApiClient:
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [MAPPING_DRIVER[driver] for driver in json_device['device_drivers']],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
......@@ -108,7 +126,9 @@ class TfsApiClient:
reply = requests.get(self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(str(self._links_url), str(reply.status_code), str(reply))
msg = MSG_ERROR.format(
str(self._links_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
......@@ -116,7 +136,10 @@ class TfsApiClient:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(json_endpoint_id['device_id']['device_uuid']['uuid'], json_endpoint_id['endpoint_uuid']['uuid'])
(
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -12,80 +12,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List, Optional
import requests
import logging, requests
from requests.auth import HTTPBasicAuth
from typing import Dict, List, Optional
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices"
GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links"
L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
GET_DEVICES_URL = '{:s}://{:s}:{:d}/tfs-api/devices'
GET_LINKS_URL = '{:s}://{:s}:{:d}/tfs-api/links'
L3VPN_URL = '{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services'
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
MAPPING_STATUS = {
"DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
"DEVICEOPERATIONALSTATUS_DISABLED": 1,
"DEVICEOPERATIONALSTATUS_ENABLED": 2,
'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
'DEVICEOPERATIONALSTATUS_DISABLED' : 1,
'DEVICEOPERATIONALSTATUS_ENABLED' : 2,
}
MAPPING_DRIVER = {
"DEVICEDRIVER_UNDEFINED": 0,
"DEVICEDRIVER_OPENCONFIG": 1,
"DEVICEDRIVER_TRANSPORT_API": 2,
"DEVICEDRIVER_P4": 3,
"DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
"DEVICEDRIVER_ONF_TR_532": 5,
"DEVICEDRIVER_XR": 6,
"DEVICEDRIVER_IETF_L2VPN": 7,
"DEVICEDRIVER_GNMI_OPENCONFIG": 8,
"DEVICEDRIVER_OPTICAL_TFS": 9,
"DEVICEDRIVER_IETF_ACTN": 10,
"DEVICEDRIVER_OC": 11,
'DEVICEDRIVER_UNDEFINED' : 0,
'DEVICEDRIVER_OPENCONFIG' : 1,
'DEVICEDRIVER_TRANSPORT_API' : 2,
'DEVICEDRIVER_P4' : 3,
'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4,
'DEVICEDRIVER_ONF_TR_532' : 5,
'DEVICEDRIVER_XR' : 6,
'DEVICEDRIVER_IETF_L2VPN' : 7,
'DEVICEDRIVER_GNMI_OPENCONFIG' : 8,
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}"
MSG_ERROR = 'Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}'
LOGGER = logging.getLogger(__name__)
class TfsApiClient:
def __init__(
self,
address: str,
port: int,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._l3vpn_url = L3VPN_URL.format(scheme, address, port)
self._auth = None
# (
# HTTPBasicAuth(username, password)
# if username is not None and password is not None
# else None
# )
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._l3vpn_url = L3VPN_URL.format(scheme, address, port)
self._auth = (
HTTPBasicAuth(username, password)
if username is not None and password is not None
else None
)
def get_devices_endpoints(
self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug("[get_devices_endpoints] begin")
LOGGER.debug(
"[get_devices_endpoints] import_topology={:s}".format(str(import_topology))
)
LOGGER.debug('[get_devices_endpoints] begin')
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
reply = requests.get(self._devices_url, timeout=TIMEOUT, auth=self._auth)
reply = requests.get(self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._devices_url), str(reply.status_code), str(reply)
......@@ -94,43 +89,44 @@ class TfsApiClient:
raise Exception(msg)
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception(
"Unsupported import_topology mode: {:s}".format(str(import_topology))
)
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
result = list()
for json_device in reply.json()["devices"]:
device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"]
device_type: str = json_device["device_type"]
device_status = json_device["device_operational_status"]
device_url = "/devices/device[{:s}]".format(device_uuid)
for json_device in reply.json()['devices']:
device_uuid : str = json_device['device_id']['device_uuid']['uuid']
device_type : str = json_device['device_type']
#if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
device_status = json_device['device_operational_status']
device_url = '/devices/device[{:s}]'.format(device_uuid)
device_data = {
"uuid": json_device["device_id"]["device_uuid"]["uuid"],
"name": json_device["name"],
"type": device_type,
"status": MAPPING_STATUS[device_status],
"drivers": [
MAPPING_DRIVER[driver] for driver in json_device["device_drivers"]
'uuid': json_device['device_id']['device_uuid']['uuid'],
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
for json_endpoint in json_device["device_endpoints"]:
endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"]
endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid)
for json_endpoint in json_device['device_endpoints']:
endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid']
endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid)
endpoint_data = {
"device_uuid": device_uuid,
"uuid": endpoint_uuid,
"name": json_endpoint["name"],
"type": json_endpoint["endpoint_type"],
'device_uuid': device_uuid,
'uuid': endpoint_uuid,
'name': json_endpoint['name'],
'type': json_endpoint['endpoint_type'],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug("[get_devices_endpoints] devices only; returning")
LOGGER.debug('[get_devices_endpoints] devices only; returning')
return result
reply = requests.get(self._links_url, timeout=TIMEOUT, auth=self._auth)
reply = requests.get(self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._links_url), str(reply.status_code), str(reply)
......@@ -138,50 +134,49 @@ class TfsApiClient:
LOGGER.error(msg)
raise Exception(msg)
for json_link in reply.json()["links"]:
link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"]
link_url = "/links/link[{:s}]".format(link_uuid)
for json_link in reply.json()['links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id["device_id"]["device_uuid"]["uuid"],
json_endpoint_id["endpoint_uuid"]["uuid"],
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link["link_endpoint_ids"]
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
"uuid": json_link["link_id"]["link_uuid"]["uuid"],
"name": json_link["name"],
"endpoints": link_endpoint_ids,
'uuid': json_link['link_id']['link_uuid']['uuid'],
'name': json_link['name'],
'endpoints': link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug("[get_devices_endpoints] topology; returning")
LOGGER.debug('[get_devices_endpoints] topology; returning')
return result
def create_connectivity_service(self, l3vpn_data: dict) -> None:
try:
requests.post(self._l3vpn_url, json=l3vpn_data)
LOGGER.debug(
"[create_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data))
)
MSG = '[create_connectivity_service] l3vpn_data={:s}'
LOGGER.debug(MSG.format(str(l3vpn_data)))
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
raise Exception('Failed to send POST request to TFS L3VPN NBI')
def update_connectivity_service(self, l3vpn_data: dict) -> None:
vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']["vpn-services"]["vpn-service"][0]["vpn-id"]
url = self._l3vpn_url + f"/vpn-service={vpn_id}"
vpn_id = l3vpn_data['ietf-l3vpn-svc:l3vpn-svc']['vpn-services']['vpn-service'][0]['vpn-id']
url = self._l3vpn_url + f'/vpn-service={vpn_id}'
try:
requests.put(url, json=l3vpn_data)
LOGGER.debug(
"[update_connectivity_service] l3vpn_data={:s}".format(str(l3vpn_data))
)
MSG = '[update_connectivity_service] l3vpn_data={:s}'
LOGGER.debug(MSG.format(str(l3vpn_data)))
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
raise Exception('Failed to send PUT request to TFS L3VPN NBI')
def delete_connectivity_service(self, service_uuid: str) -> None:
url = self._l3vpn_url + f"/vpn-service={service_uuid}"
url = self._l3vpn_url + f'/vpn-service={service_uuid}'
try:
requests.delete(url, auth=self._auth)
LOGGER.debug("[delete_connectivity_service] url={:s}".format(str(url)))
requests.delete(url)
MSG = '[delete_connectivity_service] url={:s}'
LOGGER.debug(MSG.format(str(url)))
except requests.exceptions.ConnectionError:
raise Exception("faild to send delete request to TFS L3VPN NBI")
raise Exception('Failed to send DELETE request to TFS L3VPN NBI')
......@@ -12,37 +12,43 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests, threading
from requests.auth import HTTPBasicAuth
import json, logging, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
from .Tools import find_key, add_lightpath, del_lightpath, get_lightpaths
from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
from device.service.drivers.ietf_l2vpn.TfsApiClient import TfsApiClient
from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_SERVICES
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum, get_import_topology
from .TfsApiClient import TfsApiClient
from .TfsOpticalClient import TfsOpticalClient
LOGGER = logging.getLogger(__name__)
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
]
DRIVER_NAME = 'optical_tfs'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class OpticalTfsDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
username = self.settings.get('username')
username = self.settings.get('username')
password = self.settings.get('password')
self.__auth = HTTPBasicAuth(username, password) if username is not None and password is not None else None
scheme = self.settings.get('scheme', 'http')
self.tac = TfsApiClient(self.address, int(self.port), scheme=scheme, username=username, password=password)
self.__base_url = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120))
scheme = self.settings.get('scheme', 'http')
timeout = int(self.settings.get('timeout', 60))
self.tac = TfsApiClient(
self.address, int(self.port), scheme=scheme, username=username,
password=password, timeout=timeout
)
self.toc = TfsOpticalClient(
self.address, int(self.port), scheme=scheme, username=username,
password=password, timeout=timeout
)
# Options are:
# disabled --> just import endpoints as usual
......@@ -51,19 +57,14 @@ class OpticalTfsDriver(_Driver):
# topology --> imports sub-devices and links connecting them.
# (not supported by XR driver)
self.__import_topology = get_import_topology(self.settings, default=ImportTopologyEnum.TOPOLOGY)
def Connect(self) -> bool:
url = self.__base_url + '/OpticalTFS/GetLightpaths'
with self.__lock:
if self.__started.is_set(): return True
try:
requests.get(url, timeout=self.__timeout, verify=False, auth=self.__auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__tapi_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__tapi_root)))
self.toc.check_credentials()
except: # pylint: disable=bare-except
LOGGER.exception('Exception checking credentials')
return False
else:
self.__started.set()
......@@ -84,68 +85,81 @@ class OpticalTfsDriver(_Driver):
chk_type('resources', resource_keys, list)
results = []
with self.__lock:
self.toc.check_credentials()
if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
chk_string(str_resource_name, resource_key, allow_empty=False)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(self.tac.get_devices_endpoints(self.__import_topology))
# results.extend(get_lightpaths(
# self.__base_url, resource_key, timeout=self.__timeout, auth=self.__auth))
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(self.tac.get_devices_endpoints(self.__import_topology))
elif resource_key == RESOURCE_SERVICES:
# return all services through
results.extend(self.toc.get_lightpaths())
else:
MSG = 'ResourceKey({:s}) not implemented'
LOGGER.warning(MSG.format(str(resource_key)))
except Exception as e:
LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
if len(resources) == 0: return results
with self.__lock:
for _, resource in resources:
self.toc.check_credentials()
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = add_lightpath(self.__base_url, src_node, dst_node, bitrate,
auth=self.__auth, timeout=self.__timeout)
results.extend(response)
resource_key,resource_value = resource
try:
resource_value = json.loads(resource_value)
src_node = resource_value['src_node']
dst_node = resource_value['dst_node']
bitrate = resource_value['bitrate' ]
results.extend(self.toc.add_lightpath(src_node, dst_node, bitrate))
results.append((resource_key, True))
except Exception as e:
LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
if len(resources) == 0: return results
with self.__lock:
for _, resource in resources:
self.toc.check_credentials()
for resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
flow_id = find_key(resource, 'flow_id')
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = del_lightpath(self.__base_url, flow_id, src_node, dst_node, bitrate)
results.extend(response)
resource_key,resource_value = resource
try:
resource_value = json.loads(resource_value)
flow_id = resource_value['flow_id' ]
src_node = resource_value['src_node']
dst_node = resource_value['dst_node']
bitrate = resource_value['bitrate' ]
self.toc.del_lightpath(flow_id, src_node, dst_node, bitrate)
results.append((resource_key, True))
except Exception as e:
LOGGER.exception('Unhandled error processing resource_key({:s})'.format(str(resource_key)))
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Optical TFS does not support monitoring by now
# TODO: Optical TFS does not support monitoring by now
return [False for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# Optical TFS does not support monitoring by now
# TODO: Optical TFS does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# Optical TFS does not support monitoring by now
# TODO: Optical TFS does not support monitoring by now
return []
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, requests
from typing import Dict, List, Optional
from common.tools.client.RestClient import RestClient
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = '/tfs-api/devices'
GET_LINKS_URL = '/tfs-api/links'
MAPPING_STATUS = {
'DEVICEOPERATIONALSTATUS_UNDEFINED': 0,
'DEVICEOPERATIONALSTATUS_DISABLED' : 1,
'DEVICEOPERATIONALSTATUS_ENABLED' : 2,
}
MAPPING_DRIVER = {
'DEVICEDRIVER_UNDEFINED' : 0,
'DEVICEDRIVER_OPENCONFIG' : 1,
'DEVICEDRIVER_TRANSPORT_API' : 2,
'DEVICEDRIVER_P4' : 3,
'DEVICEDRIVER_IETF_NETWORK_TOPOLOGY': 4,
'DEVICEDRIVER_ONF_TR_532' : 5,
'DEVICEDRIVER_XR' : 6,
'DEVICEDRIVER_IETF_L2VPN' : 7,
'DEVICEDRIVER_GNMI_OPENCONFIG' : 8,
'DEVICEDRIVER_OPTICAL_TFS' : 9,
'DEVICEDRIVER_IETF_ACTN' : 10,
'DEVICEDRIVER_OC' : 11,
'DEVICEDRIVER_QKD' : 12,
'DEVICEDRIVER_IETF_L3VPN' : 13,
'DEVICEDRIVER_IETF_SLICE' : 14,
'DEVICEDRIVER_NCE' : 15,
}
LOGGER = logging.getLogger(__name__)
class TfsApiClient(RestClient):
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def get_devices_endpoints(
self, import_topology : ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug('[get_devices_endpoints] begin')
MSG = '[get_devices_endpoints] import_topology={:s}'
LOGGER.debug(MSG.format(str(import_topology)))
if import_topology == ImportTopologyEnum.DISABLED:
MSG = 'Unsupported import_topology mode: {:s}'
raise Exception(MSG.format(str(import_topology)))
devices = self.get(GET_DEVICES_URL, expected_status_codes={requests.codes['OK']})
result = list()
for json_device in devices['devices']:
device_uuid : str = json_device['device_id']['device_uuid']['uuid']
device_type : str = json_device['device_type']
#if not device_type.startswith('emu-'): device_type = 'emu-' + device_type
device_status = json_device['device_operational_status']
device_url = '/devices/device[{:s}]'.format(device_uuid)
device_data = {
'uuid': json_device['device_id']['device_uuid']['uuid'],
'name': json_device['name'],
'type': device_type,
'status': MAPPING_STATUS[device_status],
'drivers': [
MAPPING_DRIVER[driver]
for driver in json_device['device_drivers']
],
}
result.append((device_url, device_data))
for json_endpoint in json_device['device_endpoints']:
endpoint_uuid = json_endpoint['endpoint_id']['endpoint_uuid']['uuid']
endpoint_url = '/endpoints/endpoint[{:s}]'.format(endpoint_uuid)
endpoint_data = {
'device_uuid': device_uuid,
'uuid': endpoint_uuid,
'name': json_endpoint['name'],
'type': json_endpoint['endpoint_type'],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug('[get_devices_endpoints] devices only; returning')
return result
links = self.get(GET_LINKS_URL, expected_status_codes={requests.codes['OK']})
for json_link in links['links']:
link_uuid : str = json_link['link_id']['link_uuid']['uuid']
link_url = '/links/link[{:s}]'.format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id['device_id']['device_uuid']['uuid'],
json_endpoint_id['endpoint_uuid']['uuid'],
)
for json_endpoint_id in json_link['link_endpoint_ids']
]
link_data = {
'uuid': json_link['link_id']['link_uuid']['uuid'],
'name': json_link['name'],
'endpoints': link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug('[get_devices_endpoints] topology; returning')
return result
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, requests
from typing import Dict, List, Optional, Union
from common.tools.client.RestClient import RestClient
LOGGER = logging.getLogger(__name__)
GET_OPTICAL_LINKS_URL = '/OpticalTFS/GetLinks'
GET_LIGHTPATHS_URL = '/OpticalTFS/GetLightpaths'
ADD_LIGHTPATH_URL = '/OpticalTFS/AddLightpath/{src_node:s}/{dst_node:s}/{bitrate:s}'
DEL_LIGHTPATH_URL = '/OpticalTFS/DelLightpath/{flow_id:s}/{src_node:s}/{dst_node:s}/{bitrate:s}'
class TfsOpticalClient(RestClient):
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : Optional[int] = 30
) -> None:
super().__init__(
address, port, scheme=scheme, username=username, password=password,
timeout=timeout, verify_certs=False, allow_redirects=True, logger=LOGGER
)
def check_credentials(self) -> None:
self.get(GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']})
LOGGER.info('Credentials checked')
def get_optical_links(self) -> Union[List[Dict], Exception]:
try:
return self.get(GET_OPTICAL_LINKS_URL, expected_status_codes={requests.codes['OK']})
except Exception as e:
LOGGER.exception('Exception retrieving optical links')
return e
def get_lightpaths(self) -> Union[List[Dict], Exception]:
try:
lightpaths : List[Dict] = self.get(
GET_LIGHTPATHS_URL, expected_status_codes={requests.codes['OK']}
)
except Exception as e:
LOGGER.exception('Exception retrieving lightpaths')
return e
result = []
for lightpath in lightpaths:
assert 'flow_id' in lightpath
assert 'src' in lightpath
assert 'dst' in lightpath
assert 'bitrate' in lightpath
resource_key = '/lightpaths/lightpath[{:s}]'.format(lightpath['flow_id'])
result.append((resource_key, lightpath))
return result
def add_lightpath(
self, src_node : str, dst_node : str, bitrate : int
) -> Union[List[Dict], Exception]:
MSG = 'Add Lightpath: {:s} <-> {:s} with {:d} bitrate'
LOGGER.info(MSG.format(str(src_node), str(dst_node), int(bitrate)))
request_endpoint = ADD_LIGHTPATH_URL.format(
src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate)
)
expected_status_codes = {requests.codes['CREATED'], requests.codes['NO_CONTENT']}
try:
return self.put(request_endpoint, expected_status_codes=expected_status_codes)
except Exception as e:
MSG = 'Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'
LOGGER.exception(MSG.format(str(src_node), str(dst_node), str(bitrate)))
return e
def del_lightpath(
self, flow_id : str, src_node : str, dst_node : str, bitrate : int
) -> Union[List[Dict], Exception]:
MSG = 'Delete Lightpath {:s}: {:s} <-> {:s} with {:d} bitrate'
LOGGER.info(MSG.format(str(flow_id), str(src_node), str(dst_node), int(bitrate)))
request_endpoint = DEL_LIGHTPATH_URL.format(
src_node=str(src_node), dst_node=str(dst_node), bitrate=int(bitrate)
)
expected_status_codes = {requests.codes['NO_CONTENT']}
try:
return self.delete(request_endpoint, expected_status_codes=expected_status_codes)
except Exception as e:
MSG = 'Exception deleting Lightpath {:s}: {:s} <-> {:s} with {:s} bitrate'
LOGGER.exception(MSG.format(str(flow_id), str(src_node), str(dst_node), str(bitrate)))
return e
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Optional
LOGGER = logging.getLogger(__name__)
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
def find_key(resource, key):
return json.loads(resource[1])[key]
def get_lightpaths(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLightpaths'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
flows = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
for flow in flows:
flow_id = flow.get('flow_id')
source = flow.get('src')
destination = flow.get('dst')
bitrate = flow.get('bitrate')
endpoint_url = '/flows/flow[{:s}]'.format(flow_id)
endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate}
result.append((endpoint_url, endpoint_data))
return result
def add_lightpath(root_url, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/AddLightpath/{:s}/{:s}/{:s}'.format(
root_url, src_node, dst_node, bitrate)
results = []
try:
LOGGER.info('Lightpath request: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
response = requests.put(url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
results.append(response.json())
LOGGER.info('Response: {:s}'.format(str(response)))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not create Lightpath(status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def del_lightpath(root_url, flow_id, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
url = '{:s}/OpticalTFS/DelLightpath/{:s}/{:s}/{:s}/{:s}'.format(
root_url, flow_id, src_node, dst_node, bitrate)
headers = {'accept': 'application/json'}
results = []
try:
response = requests.delete(
url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception deleting Lightpath(uuid={:s})'.format(str(flow_id)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not delete Lightpath(flow_id={:s}). status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(flow_id), str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def get_topology(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLinks'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
response = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
result.append(response)
return result
......@@ -12,9 +12,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_SERVICES
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment