Commit 363195f4 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

feat: initial version of NCE driver added.

parent c31a36bf
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from device.service.driver_api._Driver import (
    RESOURCE_ENDPOINTS,
    RESOURCE_INTERFACES,
    RESOURCE_NETWORK_INSTANCES,
)

SPECIAL_RESOURCE_MAPPINGS = {
    RESOURCE_ENDPOINTS: "/endpoints",
    RESOURCE_INTERFACES: "/interfaces",
    RESOURCE_NETWORK_INSTANCES: "/net-instances",
}
+172 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict, List, Optional

import requests
from requests.auth import HTTPBasicAuth

from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum

GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices"
GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links"
L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
TIMEOUT = 30

HTTP_OK_CODES = {
    200,  # OK
    201,  # Created
    202,  # Accepted
    204,  # No Content
}

MAPPING_STATUS = {
    "DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
    "DEVICEOPERATIONALSTATUS_DISABLED": 1,
    "DEVICEOPERATIONALSTATUS_ENABLED": 2,
}

MAPPING_DRIVER = {
    "DEVICEDRIVER_UNDEFINED": 0,
    "DEVICEDRIVER_OPENCONFIG": 1,
    "DEVICEDRIVER_TRANSPORT_API": 2,
    "DEVICEDRIVER_P4": 3,
    "DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
    "DEVICEDRIVER_ONF_TR_532": 5,
    "DEVICEDRIVER_XR": 6,
    "DEVICEDRIVER_IETF_L2VPN": 7,
    "DEVICEDRIVER_GNMI_OPENCONFIG": 8,
    "DEVICEDRIVER_OPTICAL_TFS": 9,
    "DEVICEDRIVER_IETF_ACTN": 10,
    "DEVICEDRIVER_OC": 11,
}

MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}"

LOGGER = logging.getLogger(__name__)


class TfsApiClient:
    def __init__(
        self,
        address: str,
        port: int,
        scheme: str = "http",
        username: Optional[str] = None,
        password: Optional[str] = None,
    ) -> None:
        self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
        self._links_url = GET_LINKS_URL.format(scheme, address, port)
        self._l3vpn_url = L3VPN_URL.format(scheme, address, port)
        self._auth = None
        # (
        #     HTTPBasicAuth(username, password)
        #     if username is not None and password is not None
        #     else None
        # )

    def get_devices_endpoints(
        self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES
    ) -> List[Dict]:
        LOGGER.debug("[get_devices_endpoints] begin")
        LOGGER.debug(
            "[get_devices_endpoints] import_topology={:s}".format(str(import_topology))
        )

        reply = requests.get(self._devices_url, timeout=TIMEOUT, auth=self._auth)
        if reply.status_code not in HTTP_OK_CODES:
            msg = MSG_ERROR.format(
                str(self._devices_url), str(reply.status_code), str(reply)
            )
            LOGGER.error(msg)
            raise Exception(msg)

        if import_topology == ImportTopologyEnum.DISABLED:
            raise Exception(
                "Unsupported import_topology mode: {:s}".format(str(import_topology))
            )

        result = list()
        for json_device in reply.json()["devices"]:
            device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"]
            device_type: str = json_device["device_type"]
            device_status = json_device["device_operational_status"]
            device_url = "/devices/device[{:s}]".format(device_uuid)
            device_data = {
                "uuid": json_device["device_id"]["device_uuid"]["uuid"],
                "name": json_device["name"],
                "type": device_type,
                "status": MAPPING_STATUS[device_status],
                "drivers": [
                    MAPPING_DRIVER[driver] for driver in json_device["device_drivers"]
                ],
            }
            result.append((device_url, device_data))

            for json_endpoint in json_device["device_endpoints"]:
                endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"]
                endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid)
                endpoint_data = {
                    "device_uuid": device_uuid,
                    "uuid": endpoint_uuid,
                    "name": json_endpoint["name"],
                    "type": json_endpoint["endpoint_type"],
                }
                result.append((endpoint_url, endpoint_data))

        if import_topology == ImportTopologyEnum.DEVICES:
            LOGGER.debug("[get_devices_endpoints] devices only; returning")
            return result

        reply = requests.get(self._links_url, timeout=TIMEOUT, auth=self._auth)
        if reply.status_code not in HTTP_OK_CODES:
            msg = MSG_ERROR.format(
                str(self._links_url), str(reply.status_code), str(reply)
            )
            LOGGER.error(msg)
            raise Exception(msg)

        for json_link in reply.json()["links"]:
            link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"]
            link_url = "/links/link[{:s}]".format(link_uuid)
            link_endpoint_ids = [
                (
                    json_endpoint_id["device_id"]["device_uuid"]["uuid"],
                    json_endpoint_id["endpoint_uuid"]["uuid"],
                )
                for json_endpoint_id in json_link["link_endpoint_ids"]
            ]
            link_data = {
                "uuid": json_link["link_id"]["link_uuid"]["uuid"],
                "name": json_link["name"],
                "endpoints": link_endpoint_ids,
            }
            result.append((link_url, link_data))

        LOGGER.debug("[get_devices_endpoints] topology; returning")
        return result

    def create_connectivity_service(self, l3vpn_data: dict) -> None:
        try:
            requests.post(self._l3vpn_url, json=l3vpn_data)
        except requests.exceptions.ConnectionError:
            raise Exception("faild to send post request to TFS L3VPN NBI")

    def delete_connectivity_service(self, service_uuid: str) -> None:
        url = self._l3vpn_url + f"/vpn-service={service_uuid}"
        try:
            requests.delete(url, auth=self._auth)
        except requests.exceptions.ConnectionError:
            raise Exception("faild to send delete request to TFS L3VPN NBI")
+221 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, Optional, Tuple

from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS

from .Constants import SPECIAL_RESOURCE_MAPPINGS

LOGGER = logging.getLogger(__name__)


def create_app_flow(resource_value: dict) -> dict:
    app_flow_id: str = resource_value["app_flow_id"]
    app_flow_user_id: str = resource_value["app_flow_user_id"]
    max_latency: int = resource_value["max_latency"]
    max_jitter: int = resource_value["max_jitter"]
    max_loss: float = resource_value["max_loss"]
    upstream_assure_bw: str = resource_value["upstream_assure_bw"]
    upstream_max_bw: str = resource_value["upstream_max_bw"]
    downstream_assure_bw: str = resource_value["downstream_assure_bw"]
    downstream_max_bw: str = resource_value["downstream_max_bw"]
    src_ip: str = resource_value["src_ip"]
    src_port: str = resource_value["src_port"]
    dst_ip: str = resource_value["dst_ip"]
    dst_port: str = resource_value["dst_port"]

    app_flow_app_name: str = f"App_Flow_{app_flow_id}"
    app_flow_service_profile: str = f"service_{app_flow_id}"
    app_id: str = f"app_{app_flow_id}"
    app_feature_id: str = f"feature_{app_flow_id}"
    app_flow_name: str = resource_value.get("app_flow_name", "App_Flow_Example")
    app_flow_max_online_users: int = resource_value.get("app_flow_max_online_users", 1)
    app_flow_stas: str = resource_value.get("stas", "00:3D:E1:18:82:9E")
    qos_profile_name: str = resource_value.get("app_flow_qos_profile", "AR_VR_Gaming")
    app_flow_duration: int = resource_value.get("app_flow_duration", 9999)
    protocol: str = resource_value.get("protocol", "tcp")

    app_flow = {
        "name": app_flow_name,
        "user-id": app_flow_user_id,
        "app-name": app_flow_app_name,
        "max-online-users": app_flow_max_online_users,
        "stas": app_flow_stas,
        "qos-profile": qos_profile_name,
        "service-profile": app_flow_service_profile,
        "duration": app_flow_duration,
    }
    qos_profile = {
        "name": qos_profile_name,
        "max-latency": max_latency,
        "max-jitter": max_jitter,
        "max-loss": max_loss,
        "upstream": {
            "assure-bandwidth": upstream_assure_bw,
            "max-bandwidth": upstream_max_bw,
        },
        "downstream": {
            "assure-bandwidth": downstream_assure_bw,
            "max-bandwidth": downstream_max_bw,
        },
    }
    application = {
        "name": app_flow_name,
        "app-id": app_id,
        "app-features": {
            "app-feature": [
                {
                    "id": app_feature_id,
                    "dest-ip": dst_ip,
                    "dest-port": dst_port,
                    "src-ip": src_ip,
                    "src-port": src_port,
                    "protocol": protocol,
                }
            ]
        },
    }
    app_flow_datamodel = {
        "huawei-nce-app-flow:app-flows": {
            "app-flow": [app_flow],
            "qos-profiles": {"qos-profile": [qos_profile]},
            "applications": {"application": [application]},
        }
    }
    return app_flow_datamodel


def process_optional_string_field(
    endpoint_data: Dict[str, Any],
    field_name: str,
    endpoint_resource_value: Dict[str, Any],
) -> None:
    field_value = chk_attribute(
        field_name, endpoint_data, "endpoint_data", default=None
    )
    if field_value is None:
        return
    chk_string("endpoint_data.{:s}".format(field_name), field_value)
    if len(field_value) > 0:
        endpoint_resource_value[field_name] = field_value


def compose_resource_endpoint(
    endpoint_data: Dict[str, Any],
) -> Optional[Tuple[str, Dict]]:
    try:
        # Check type of endpoint_data
        chk_type("endpoint_data", endpoint_data, dict)

        # Check endpoint UUID (mandatory)
        endpoint_uuid = chk_attribute("uuid", endpoint_data, "endpoint_data")
        chk_string("endpoint_data.uuid", endpoint_uuid, min_length=1)
        endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
        endpoint_resource_key = "{:s}/endpoint[{:s}]".format(
            endpoint_resource_path, endpoint_uuid
        )
        endpoint_resource_value = {"uuid": endpoint_uuid}

        # Check endpoint optional string fields
        process_optional_string_field(endpoint_data, "name", endpoint_resource_value)
        process_optional_string_field(
            endpoint_data, "site_location", endpoint_resource_value
        )
        process_optional_string_field(endpoint_data, "ce-ip", endpoint_resource_value)
        process_optional_string_field(
            endpoint_data, "address_ip", endpoint_resource_value
        )
        process_optional_string_field(
            endpoint_data, "address_prefix", endpoint_resource_value
        )
        process_optional_string_field(endpoint_data, "mtu", endpoint_resource_value)
        process_optional_string_field(
            endpoint_data, "ipv4_lan_prefixes", endpoint_resource_value
        )
        process_optional_string_field(endpoint_data, "type", endpoint_resource_value)
        process_optional_string_field(
            endpoint_data, "context_uuid", endpoint_resource_value
        )
        process_optional_string_field(
            endpoint_data, "topology_uuid", endpoint_resource_value
        )

        # Check endpoint sample types (optional)
        endpoint_sample_types = chk_attribute(
            "sample_types", endpoint_data, "endpoint_data", default=[]
        )
        chk_type("endpoint_data.sample_types", endpoint_sample_types, list)
        sample_types = {}
        sample_type_errors = []
        for i, endpoint_sample_type in enumerate(endpoint_sample_types):
            field_name = "endpoint_data.sample_types[{:d}]".format(i)
            try:
                chk_type(field_name, endpoint_sample_type, (int, str))
                if isinstance(endpoint_sample_type, int):
                    metric_name = KpiSampleType.Name(endpoint_sample_type)
                    metric_id = endpoint_sample_type
                elif isinstance(endpoint_sample_type, str):
                    metric_id = KpiSampleType.Value(endpoint_sample_type)
                    metric_name = endpoint_sample_type
                else:
                    str_type = str(type(endpoint_sample_type))
                    raise Exception("Bad format: {:s}".format(str_type))  # pylint: disable=broad-exception-raised
            except Exception as e:  # pylint: disable=broad-exception-caught
                MSG = "Unsupported {:s}({:s}) : {:s}"
                sample_type_errors.append(
                    MSG.format(field_name, str(endpoint_sample_type), str(e))
                )

            metric_name = metric_name.lower().replace("kpisampletype_", "")
            monitoring_resource_key = "{:s}/state/{:s}".format(
                endpoint_resource_key, metric_name
            )
            sample_types[metric_id] = monitoring_resource_key

        if len(sample_type_errors) > 0:
            # pylint: disable=broad-exception-raised
            raise Exception(
                "Malformed Sample Types:\n{:s}".format("\n".join(sample_type_errors))
            )

        if len(sample_types) > 0:
            endpoint_resource_value["sample_types"] = sample_types

        if "site_location" in endpoint_data:
            endpoint_resource_value["site_location"] = endpoint_data["site_location"]

        if "ce-ip" in endpoint_data:
            endpoint_resource_value["ce-ip"] = endpoint_data["ce-ip"]

        if "address_ip" in endpoint_data:
            endpoint_resource_value["address_ip"] = endpoint_data["address_ip"]

        if "address_prefix" in endpoint_data:
            endpoint_resource_value["address_prefix"] = endpoint_data["address_prefix"]

        if "mtu" in endpoint_data:
            endpoint_resource_value["mtu"] = endpoint_data["mtu"]

        if "ipv4_lan_prefixes" in endpoint_data:
            endpoint_resource_value["ipv4_lan_prefixes"] = endpoint_data[
                "ipv4_lan_prefixes"
            ]

        return endpoint_resource_key, endpoint_resource_value
    except:  # pylint: disable=bare-except
        LOGGER.exception("Problem composing endpoint({:s})".format(str(endpoint_data)))
        return None
+13 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+285 −0

File added.

Preview size limit exceeded, changes collapsed.