Skip to content
Snippets Groups Projects
Commit 91b779d7 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

feat:

- app-flow create/delete operations and urls added to driver
- TfsApiClient.py renamed to nce_fan_client.py
- create_app_flow function removed from drivers/nce/Tools.py
parent bbc35d3c
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!304Resolve "(CTTC) Driver required to interact with NCE controller"
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List, Optional
import requests
from requests.auth import HTTPBasicAuth
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices"
GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links"
L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
MAPPING_STATUS = {
"DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
"DEVICEOPERATIONALSTATUS_DISABLED": 1,
"DEVICEOPERATIONALSTATUS_ENABLED": 2,
}
MAPPING_DRIVER = {
"DEVICEDRIVER_UNDEFINED": 0,
"DEVICEDRIVER_OPENCONFIG": 1,
"DEVICEDRIVER_TRANSPORT_API": 2,
"DEVICEDRIVER_P4": 3,
"DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
"DEVICEDRIVER_ONF_TR_532": 5,
"DEVICEDRIVER_XR": 6,
"DEVICEDRIVER_IETF_L2VPN": 7,
"DEVICEDRIVER_GNMI_OPENCONFIG": 8,
"DEVICEDRIVER_OPTICAL_TFS": 9,
"DEVICEDRIVER_IETF_ACTN": 10,
"DEVICEDRIVER_OC": 11,
}
MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}"
LOGGER = logging.getLogger(__name__)
class TfsApiClient:
def __init__(
self,
address: str,
port: int,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._l3vpn_url = L3VPN_URL.format(scheme, address, port)
self._auth = None
# (
# HTTPBasicAuth(username, password)
# if username is not None and password is not None
# else None
# )
def get_devices_endpoints(
self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug("[get_devices_endpoints] begin")
LOGGER.debug(
"[get_devices_endpoints] import_topology={:s}".format(str(import_topology))
)
reply = requests.get(self._devices_url, timeout=TIMEOUT, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._devices_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception(
"Unsupported import_topology mode: {:s}".format(str(import_topology))
)
result = list()
for json_device in reply.json()["devices"]:
device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"]
device_type: str = json_device["device_type"]
device_status = json_device["device_operational_status"]
device_url = "/devices/device[{:s}]".format(device_uuid)
device_data = {
"uuid": json_device["device_id"]["device_uuid"]["uuid"],
"name": json_device["name"],
"type": device_type,
"status": MAPPING_STATUS[device_status],
"drivers": [
MAPPING_DRIVER[driver] for driver in json_device["device_drivers"]
],
}
result.append((device_url, device_data))
for json_endpoint in json_device["device_endpoints"]:
endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"]
endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid)
endpoint_data = {
"device_uuid": device_uuid,
"uuid": endpoint_uuid,
"name": json_endpoint["name"],
"type": json_endpoint["endpoint_type"],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug("[get_devices_endpoints] devices only; returning")
return result
reply = requests.get(self._links_url, timeout=TIMEOUT, auth=self._auth)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._links_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
for json_link in reply.json()["links"]:
link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"]
link_url = "/links/link[{:s}]".format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id["device_id"]["device_uuid"]["uuid"],
json_endpoint_id["endpoint_uuid"]["uuid"],
)
for json_endpoint_id in json_link["link_endpoint_ids"]
]
link_data = {
"uuid": json_link["link_id"]["link_uuid"]["uuid"],
"name": json_link["name"],
"endpoints": link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug("[get_devices_endpoints] topology; returning")
return result
def create_connectivity_service(self, l3vpn_data: dict) -> None:
try:
requests.post(self._l3vpn_url, json=l3vpn_data)
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
def delete_connectivity_service(self, service_uuid: str) -> None:
url = self._l3vpn_url + f"/vpn-service={service_uuid}"
try:
requests.delete(url, auth=self._auth)
except requests.exceptions.ConnectionError:
raise Exception("faild to send delete request to TFS L3VPN NBI")
...@@ -23,82 +23,6 @@ from .Constants import SPECIAL_RESOURCE_MAPPINGS ...@@ -23,82 +23,6 @@ from .Constants import SPECIAL_RESOURCE_MAPPINGS
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
def create_app_flow(resource_value: dict) -> dict:
app_flow_id: str = resource_value["app_flow_id"]
app_flow_user_id: str = resource_value["app_flow_user_id"]
max_latency: int = resource_value["max_latency"]
max_jitter: int = resource_value["max_jitter"]
max_loss: float = resource_value["max_loss"]
upstream_assure_bw: str = resource_value["upstream_assure_bw"]
upstream_max_bw: str = resource_value["upstream_max_bw"]
downstream_assure_bw: str = resource_value["downstream_assure_bw"]
downstream_max_bw: str = resource_value["downstream_max_bw"]
src_ip: str = resource_value["src_ip"]
src_port: str = resource_value["src_port"]
dst_ip: str = resource_value["dst_ip"]
dst_port: str = resource_value["dst_port"]
app_flow_app_name: str = f"App_Flow_{app_flow_id}"
app_flow_service_profile: str = f"service_{app_flow_id}"
app_id: str = f"app_{app_flow_id}"
app_feature_id: str = f"feature_{app_flow_id}"
app_flow_name: str = resource_value.get("app_flow_name", "App_Flow_Example")
app_flow_max_online_users: int = resource_value.get("app_flow_max_online_users", 1)
app_flow_stas: str = resource_value.get("stas", "00:3D:E1:18:82:9E")
qos_profile_name: str = resource_value.get("app_flow_qos_profile", "AR_VR_Gaming")
app_flow_duration: int = resource_value.get("app_flow_duration", 9999)
protocol: str = resource_value.get("protocol", "tcp")
app_flow = {
"name": app_flow_name,
"user-id": app_flow_user_id,
"app-name": app_flow_app_name,
"max-online-users": app_flow_max_online_users,
"stas": app_flow_stas,
"qos-profile": qos_profile_name,
"service-profile": app_flow_service_profile,
"duration": app_flow_duration,
}
qos_profile = {
"name": qos_profile_name,
"max-latency": max_latency,
"max-jitter": max_jitter,
"max-loss": max_loss,
"upstream": {
"assure-bandwidth": upstream_assure_bw,
"max-bandwidth": upstream_max_bw,
},
"downstream": {
"assure-bandwidth": downstream_assure_bw,
"max-bandwidth": downstream_max_bw,
},
}
application = {
"name": app_flow_name,
"app-id": app_id,
"app-features": {
"app-feature": [
{
"id": app_feature_id,
"dest-ip": dst_ip,
"dest-port": dst_port,
"src-ip": src_ip,
"src-port": src_port,
"protocol": protocol,
}
]
},
}
app_flow_datamodel = {
"huawei-nce-app-flow:app-flows": {
"app-flow": [app_flow],
"qos-profiles": {"qos-profile": [qos_profile]},
"applications": {"application": [application]},
}
}
return app_flow_datamodel
def process_optional_string_field( def process_optional_string_field(
endpoint_data: Dict[str, Any], endpoint_data: Dict[str, Any],
field_name: str, field_name: str,
......
...@@ -37,7 +37,7 @@ from device.service.driver_api.ImportTopologyEnum import ( ...@@ -37,7 +37,7 @@ from device.service.driver_api.ImportTopologyEnum import (
) )
from .Constants import SPECIAL_RESOURCE_MAPPINGS from .Constants import SPECIAL_RESOURCE_MAPPINGS
from .TfsApiClient import TfsApiClient from .nce_fan_client import NCEClient
from .Tools import compose_resource_endpoint from .Tools import compose_resource_endpoint
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -60,7 +60,7 @@ class NCEDriver(_Driver): ...@@ -60,7 +60,7 @@ class NCEDriver(_Driver):
scheme = self.settings.get("scheme", "http") scheme = self.settings.get("scheme", "http")
username = self.settings.get("username") username = self.settings.get("username")
password = self.settings.get("password") password = self.settings.get("password")
self.tac = TfsApiClient( self.nce = NCEClient(
self.address, self.address,
self.port, self.port,
scheme=scheme, scheme=scheme,
...@@ -135,7 +135,7 @@ class NCEDriver(_Driver): ...@@ -135,7 +135,7 @@ class NCEDriver(_Driver):
if self.__started.is_set(): if self.__started.is_set():
return True return True
try: try:
# requests.get(url, timeout=self.__timeout, auth=self.__auth) # requests.get(url, timeout=self.__timeout)
... ...
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
LOGGER.exception("Timeout connecting {:s}".format(url)) LOGGER.exception("Timeout connecting {:s}".format(url))
...@@ -207,7 +207,7 @@ class NCEDriver(_Driver): ...@@ -207,7 +207,7 @@ class NCEDriver(_Driver):
for resource in resources: for resource in resources:
resource_key, resource_value = resource resource_key, resource_value = resource
if RE_NCE_APP_FLOW_OPERATION.match(resource_key): if RE_NCE_APP_FLOW_OPERATION.match(resource_key):
operation_type = json.loads(resource_value)['type'] operation_type = json.loads(resource_value)["type"]
results.append((resource_key, True)) results.append((resource_key, True))
break break
else: else:
...@@ -220,13 +220,12 @@ class NCEDriver(_Driver): ...@@ -220,13 +220,12 @@ class NCEDriver(_Driver):
try: try:
resource_value = json.loads(resource_value) resource_value = json.loads(resource_value)
if operation_type == "create": if operation_type == "create":
# create the underlying app flow self.nce.create_app_flow(resource_value)
# self.tac.create_app_flow(resource_value) elif operation_type == "delete":
... app_flow_name = resource_value["huawei-nce-app-flow:app-flows"][
elif operation_type == 'delete': "app-flow"
# delete the underlying app flow ][0]["name"]
# self.tac.delete_app_flow(resource_value) self.nce.delete_app_flow(app_flow_name)
...
LOGGER.debug(f"app_flow_datamodel {resource_value}") LOGGER.debug(f"app_flow_datamodel {resource_value}")
results.append((resource_key, True)) results.append((resource_key, True))
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import requests
from requests.auth import HTTPBasicAuth
NCE_FAN_URL = "{:s}://{:s}:{:d}/restconf/v1/data"
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
MAPPING_STATUS = {
"DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
"DEVICEOPERATIONALSTATUS_DISABLED": 1,
"DEVICEOPERATIONALSTATUS_ENABLED": 2,
}
MAPPING_DRIVER = {
"DEVICEDRIVER_UNDEFINED": 0,
"DEVICEDRIVER_OPENCONFIG": 1,
"DEVICEDRIVER_TRANSPORT_API": 2,
"DEVICEDRIVER_P4": 3,
"DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
"DEVICEDRIVER_ONF_TR_532": 5,
"DEVICEDRIVER_XR": 6,
"DEVICEDRIVER_IETF_L2VPN": 7,
"DEVICEDRIVER_GNMI_OPENCONFIG": 8,
"DEVICEDRIVER_OPTICAL_TFS": 9,
"DEVICEDRIVER_IETF_ACTN": 10,
"DEVICEDRIVER_OC": 11,
}
class NCEClient:
def __init__(
self,
address: str,
port: int,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self._nce_fan_url = NCE_FAN_URL.format(scheme, address, port)
self._auth = None
def create_app_flow(self, app_flow_data: dict) -> None:
try:
app_data = app_flow_data["huawei-nce-app-flow:app-flows"]["applications"]
app_url = self._nce_fan_url + "/app-flows/apps"
requests.post(app_url, json=app_data)
app_flow_data = {
"app-flow": app_flow_data["huawei-nce-app-flow:app-flows"]["app-flow"]
}
app_flow_url = self._nce_fan_url + "/app-flows"
requests.post(app_flow_url, json=app_flow_data)
except requests.exceptions.ConnectionError:
raise Exception("faild to send post requests to NCE FAN")
def delete_app_flow(self, app_flow_name: str) -> None:
try:
app_url = self._nce_fan_url + f"/app-flows/apps/application={app_flow_name}"
requests.delete(app_url)
app_flow_url = self._nce_fan_url + f"/app-flows/app-flow={app_flow_name}"
requests.delete(app_flow_url)
except requests.exceptions.ConnectionError:
raise Exception("faild to send delete request to NCE FAN")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment