Commit 9e77e349 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

l3vpn debug & feat:

- l3vpn driver debuged and refactored
- emulted device driver features added to l3vpn driver onboarding
- test_unitary_ietf_l3vpn.py added
parent 6ba8fc54
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from device.service.driver_api._Driver import (
    RESOURCE_ENDPOINTS,
    RESOURCE_INTERFACES,
    RESOURCE_NETWORK_INSTANCES,
)

SPECIAL_RESOURCE_MAPPINGS = {
    RESOURCE_ENDPOINTS: "/endpoints",
    RESOURCE_INTERFACES: "/interfaces",
    RESOURCE_NETWORK_INSTANCES: "/net-instances",
}
+3 −2
Original line number Diff line number Diff line
@@ -69,6 +69,7 @@ class TfsApiClient:
    ) -> None:
        self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
        self._links_url = GET_LINKS_URL.format(scheme, address, port)
        self._l3vph_url = L3VPN_URL.format(scheme, address, port)
        self._auth = (
            HTTPBasicAuth(username, password)
            if username is not None and password is not None
@@ -162,12 +163,12 @@ class TfsApiClient:

    def create_connectivity_service(self, l3vpn_data: dict) -> None:
        try:
            requests.post(L3VPN_URL, json=l3vpn_data, auth=self._auth)
            requests.post(self._l3vph_url, json=l3vpn_data, auth=self._auth)
        except requests.exceptions.ConnectionError:
            raise Exception("faild to send post request to TFS L3VPN NBI")

    def delete_connectivity_service(self, service_uuid: str) -> None:
        url = L3VPN_URL + f"/vpn-service={service_uuid}"
        url = self._l3vph_url + f"/vpn-service={service_uuid}"
        try:
            requests.delete(url, auth=self._auth)
        except requests.exceptions.ConnectionError:
+155 −31
Original line number Diff line number Diff line
@@ -12,10 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TypedDict
from typing import Any, Dict, Optional, Tuple, TypedDict

import requests

from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS

from .Constants import SPECIAL_RESOURCE_MAPPINGS


class LANPrefixesDict(TypedDict):
    lan: str
@@ -79,54 +85,66 @@ def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
    )
    src_site_id: str = resource_value.get("src_site_id", f"site_{src_site_location}")
    src_management_type: str = resource_value.get(
        "src_management_type", "ietf-l3vpn-svc:provider-managed"
        "src_management_type", "ietf-l3vpn-svc:customer-managed"
    )
    if src_management_type != "ietf-l3vpn-svc:provider-managed":
    if src_management_type != "ietf-l3vpn-svc:customer-managed":
        raise Exception("management type %s not supported", src_management_type)
    src_role: str = "ietf-l3vpn-svc:hub-role"
    src_ce_address: str = resource_value["src_ce_address"]
    src_pe_address: str = resource_value["src_pe_address"]
    src_ce_pe_network_prefix: int = resource_value["src_ce_pe_network_prefix"]
    src_mtu: int = resource_value["src_mtu"]
    src_input_bw: int = resource_value["src_input_bw"]
    src_output_bw: int = resource_value["src_output_bw"]
    src_qos_profile_id: str = resource_value["src_qos_profile_id"]
    src_qos_profile_direction: str = resource_value["src_qos_profile_direction"]
    src_qos_profile_latency: int = resource_value["src_qos_profile_latency"]
    src_qos_profile_bw_guarantee: int = resource_value["src_qos_profile_bw_guarantee"]
    src_input_bw: int = resource_value.get("src_input_bw", 1000000000)
    src_output_bw: int = resource_value.get("src_input_bw", 1000000000)
    src_qos_profile_id: str = resource_value.get(
        "src_qos_profile_id", "src_qos_profile"
    )
    src_qos_profile_direction: str = (
        resource_value.get("src_qos_profile_direction", "ietf-l3vpn-svc:both"),
    )
    src_qos_profile_latency: int = resource_value.get("src_qos_profile_latency", 10)
    src_qos_profile_bw_guarantee: int = resource_value.get(
        "src_qos_profile_bw_guarantee", 100
    )

    dst_device_uuid = resource_value["dst_device_name"]
    dst_endpoint_uuid = resource_value["dst_endpoint_name"]
    dst_site_location: str = resource_value["dst_site_location"]
    dst_ipv4_lan_prefixes: list[LANPrefixesDict] = resource_value.get(
    dst_ipv4_lan_prefixes: list[LANPrefixesDict] = resource_value[
        "dst_ipv4_lan_prefixes"
    )
    ]
    dst_site_id: str = resource_value.get("dst_site_id", f"site_{dst_site_location}")
    dst_management_type: str = resource_value.get(
        "dst_management_type", "ietf-l3vpn-svc:provider-managed"
        "dst_management_type", "ietf-l3vpn-svc:customer-managed"
    )
    if dst_management_type != "ietf-l3vpn-svc:provider-managed":
    if dst_management_type != "ietf-l3vpn-svc:customer-managed":
        raise Exception("management type %s not supported", dst_management_type)
    dst_role: str = "ietf-l3vpn-svc:spoke-role"
    dst_ce_address: str = resource_value["dst_ce_address"]
    dst_pe_address: str = resource_value["dst_pe_address"]
    dst_ce_pe_network_prefix: int = resource_value["dst_ce_pe_network_prefix"]
    dst_mtu: int = resource_value["dst_mtu"]
    dst_input_bw: int = resource_value["dst_input_bw"]
    dst_output_bw: int = resource_value["dst_output_bw"]
    dst_qos_profile_id: str = resource_value["dst_qos_profile_id"]
    dst_qos_profile_direction: str = resource_value["dst_qos_profile_direction"]
    dst_qos_profile_latency: int = resource_value["dst_qos_profile_latency"]
    dst_qos_profile_bw_guarantee: int = resource_value["dst_qos_profile_bw_guarantee"]
    dst_input_bw: int = resource_value.get("dst_input_bw", 1000000000)
    dst_output_bw: int = resource_value.get("dst_output_bw", 1000000000)
    dst_qos_profile_id: str = resource_value.get(
        "dst_qos_profile_id", "dst_qos_profile"
    )
    dst_qos_profile_direction: str = (
        resource_value.get("dst_qos_profile_direction", "ietf-l3vpn-svc:both"),
    )
    dst_qos_profile_latency: int = resource_value.get("dst_qos_profile_latency", 10)
    dst_qos_profile_bw_guarantee: int = resource_value.get(
        "dst_qos_profile_bw_guarantee", 100
    )

    # Create source site information
    src_management = {"type": "ietf-l3vpn-svc:provider-managed"}
    src_management = {"type": src_management_type}
    src_locations = {"location": [{"location-id": src_site_location}]}
    src_devices = {
        "device": [{"device-id": src_device_uuid, "location": src_site_location}]
    }
    src_site_lan_prefixes = [
        {"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": src_ce_address}
        {"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": src_pe_address}
        for lp in src_ipv4_lan_prefixes
    ]
    src_site_routing_protocols = {
@@ -186,13 +204,13 @@ def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
    }

    # Create destination site information
    dst_management = {"type": "ietf-l3vpn-svc:provider-managed"}
    dst_management = {"type": src_management_type}
    dst_locations = {"location": [{"location-id": dst_site_location}]}
    dst_devices = {
        "device": [{"device-id": dst_device_uuid, "location": dst_site_location}]
    }
    dst_site_lan_prefixes = [
        {"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": dst_ce_address}
        {"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": dst_pe_address}
        for lp in dst_ipv4_lan_prefixes
    ]
    dst_site_routing_protocols = {
@@ -261,7 +279,6 @@ def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
                "routing-protocols": src_site_routing_protocols,
                "site-network-accesses": src_site_network_accesses,
            },
            {
            {
                "site-id": dst_site_id,
                "management": dst_management,
@@ -270,7 +287,6 @@ def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
                "routing-protocols": dst_site_routing_protocols,
                "site-network-accesses": dst_site_network_accesses,
            },
            },
        ]
    }

@@ -282,3 +298,111 @@ def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
    }

    return l3_vpn_data_model


def process_optional_string_field(
    endpoint_data: Dict[str, Any],
    field_name: str,
    endpoint_resource_value: Dict[str, Any],
) -> None:
    field_value = chk_attribute(
        field_name, endpoint_data, "endpoint_data", default=None
    )
    if field_value is None:
        return
    chk_string("endpoint_data.{:s}".format(field_name), field_value)
    if len(field_value) > 0:
        endpoint_resource_value[field_name] = field_value


def compose_resource_endpoint(
    endpoint_data: Dict[str, Any],
) -> Optional[Tuple[str, Dict]]:
    try:
        # Check type of endpoint_data
        chk_type("endpoint_data", endpoint_data, dict)

        # Check endpoint UUID (mandatory)
        endpoint_uuid = chk_attribute("uuid", endpoint_data, "endpoint_data")
        chk_string("endpoint_data.uuid", endpoint_uuid, min_length=1)
        endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
        endpoint_resource_key = "{:s}/endpoint[{:s}]".format(
            endpoint_resource_path, endpoint_uuid
        )
        endpoint_resource_value = {"uuid": endpoint_uuid}

        # Check endpoint optional string fields
        process_optional_string_field(endpoint_data, "name", endpoint_resource_value)
        process_optional_string_field(endpoint_data, "type", endpoint_resource_value)
        process_optional_string_field(
            endpoint_data, "context_uuid", endpoint_resource_value
        )
        process_optional_string_field(
            endpoint_data, "topology_uuid", endpoint_resource_value
        )

        # Check endpoint sample types (optional)
        endpoint_sample_types = chk_attribute(
            "sample_types", endpoint_data, "endpoint_data", default=[]
        )
        chk_type("endpoint_data.sample_types", endpoint_sample_types, list)
        sample_types = {}
        sample_type_errors = []
        for i, endpoint_sample_type in enumerate(endpoint_sample_types):
            field_name = "endpoint_data.sample_types[{:d}]".format(i)
            try:
                chk_type(field_name, endpoint_sample_type, (int, str))
                if isinstance(endpoint_sample_type, int):
                    metric_name = KpiSampleType.Name(endpoint_sample_type)
                    metric_id = endpoint_sample_type
                elif isinstance(endpoint_sample_type, str):
                    metric_id = KpiSampleType.Value(endpoint_sample_type)
                    metric_name = endpoint_sample_type
                else:
                    str_type = str(type(endpoint_sample_type))
                    raise Exception("Bad format: {:s}".format(str_type))  # pylint: disable=broad-exception-raised
            except Exception as e:  # pylint: disable=broad-exception-caught
                MSG = "Unsupported {:s}({:s}) : {:s}"
                sample_type_errors.append(
                    MSG.format(field_name, str(endpoint_sample_type), str(e))
                )

            metric_name = metric_name.lower().replace("kpisampletype_", "")
            monitoring_resource_key = "{:s}/state/{:s}".format(
                endpoint_resource_key, metric_name
            )
            sample_types[metric_id] = monitoring_resource_key

        if len(sample_type_errors) > 0:
            # pylint: disable=broad-exception-raised
            raise Exception(
                "Malformed Sample Types:\n{:s}".format("\n".join(sample_type_errors))
            )

        if len(sample_types) > 0:
            endpoint_resource_value["sample_types"] = sample_types

        if "location" in endpoint_data:
            endpoint_resource_value["location"] = endpoint_data["location"]

        if "ce-ip" in endpoint_data:
            endpoint_resource_value["ce-ip"] = endpoint_data["ce-ip"]

        if "address_ip" in endpoint_data:
            endpoint_resource_value["address_ip"] = endpoint_data["address_ip"]

        if "address_prefix" in endpoint_data:
            endpoint_resource_value["address_prefix"] = endpoint_data["address_prefix"]

        if "mtu" in endpoint_data:
            endpoint_resource_value["mtu"] = endpoint_data["mtu"]

        if "ipv4_lan_prefixes" in endpoint_data:
            endpoint_resource_value["ipv4_lan_prefixes"] = endpoint_data[
                "ipv4_lan_prefixes"
            ]

        return endpoint_resource_key, endpoint_resource_value
    except:  # pylint: disable=bare-except
        LOGGER.exception("Problem composing endpoint({:s})".format(str(endpoint_data)))
        return None
+84 −27
Original line number Diff line number Diff line
@@ -14,29 +14,37 @@

import json
import logging
import re
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union

import anytree
import requests
from requests.auth import HTTPBasicAuth

from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from common.type_checkers.Checkers import chk_length, chk_string, chk_type
from device.service.driver_api._Driver import (
    RESOURCE_ENDPOINTS,
    RESOURCE_SERVICES,
    _Driver,
)
from device.service.driver_api.AnyTreeTools import (
    TreeNode,
    dump_subtree,
    get_subnode,
    set_subnode_value,
)
from device.service.driver_api.ImportTopologyEnum import (
    ImportTopologyEnum,
    get_import_topology,
)

from .Constants import SPECIAL_RESOURCE_MAPPINGS
from .TfsApiClient import TfsApiClient
from .Tools import (
    compose_resource_endpoint,
    create_l3vpn_datamodel,
    get_all_active_connectivity_services,
    get_connectivity_service,
    service_exists,
)

@@ -48,6 +56,8 @@ ALL_RESOURCE_KEYS = [
    RESOURCE_SERVICES,
]

RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r"^\/interface\[([^\]]+)\].*")

DRIVER_NAME = "ietf_l3vpn"
METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME})

@@ -58,12 +68,13 @@ class IetfL3VpnDriver(_Driver):
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__running = TreeNode(".")
        scheme = self.settings.get("scheme", "http")
        username = self.settings.get("username")
        password = self.settings.get("password")
        self.tac = TfsApiClient(
            self.address,
            int(self.port),
            self.port,
            scheme=scheme,
            username=username,
            password=password,
@@ -80,6 +91,52 @@ class IetfL3VpnDriver(_Driver):
        self.__import_topology = get_import_topology(
            self.settings, default=ImportTopologyEnum.DEVICES
        )
        endpoints = self.settings.get("endpoints", [])
        endpoint_resources = []
        for endpoint in endpoints:
            endpoint_resource = compose_resource_endpoint(endpoint)
            if endpoint_resource is None:
                continue
            endpoint_resources.append(endpoint_resource)
        self._set_initial_config(endpoint_resources)

    def _set_initial_config(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("resources", resources, list)
        if len(resources) == 0:
            return []
        results = []
        resolver = anytree.Resolver(pathattr="name")
        with self.__lock:
            for i, resource in enumerate(resources):
                str_resource_name = "resources[#{:d}]".format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, min_length=2, max_length=2)
                    resource_key, resource_value = resource
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split("/")
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
                        "Exception validating {:s}: {:s}".format(
                            str_resource_name, str(resource_key)
                        )
                    )
                    results.append(e)  # if validation fails, store the exception
                    continue

                try:
                    resource_value = json.loads(resource_value)
                except:  # pylint: disable=bare-except
                    pass

                set_subnode_value(
                    resolver, self.__running, resource_path, resource_value
                )

                results.append(True)
        return results

    def Connect(self) -> bool:
        url = (
@@ -121,38 +178,38 @@ class IetfL3VpnDriver(_Driver):
        self, resource_keys: List[str] = []
    ) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type("resources", resource_keys, list)
        results = []
        with self.__lock:
            if len(resource_keys) == 0:
                resource_keys = ALL_RESOURCE_KEYS
                return dump_subtree(self.__running)
            results = []
            resolver = anytree.Resolver(pathattr="name")
            for i, resource_key in enumerate(resource_keys):
                str_resource_name = "resource_key[#{:d}]".format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    if resource_key == RESOURCE_ENDPOINTS:
                        # return endpoints through TFS NBI API and list-devices method
                        results.extend(
                            self.tac.get_devices_endpoints(self.__import_topology)
                        )
                    elif resource_key == RESOURCE_SERVICES:
                        # return all services through
                        reply = get_all_active_connectivity_services(
                            wim_url=self.__tfs_nbi_root, auth=self.__auth
                        )
                        results.extend(reply.json())
                    else:
                        # assume single-service retrieval
                        reply = get_connectivity_service(
                            self.__tfs_nbi_root, resource_key
                    resource_key = SPECIAL_RESOURCE_MAPPINGS.get(
                        resource_key, resource_key
                    )
                        results.append(reply.json())
                    resource_path = resource_key.split("/")
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
                        "Unhandled error processing resource_key({:s})".format(
                            str(resource_key)
                        "Exception validating {:s}: {:s}".format(
                            str_resource_name, str(resource_key)
                        )
                    )
                    results.append((resource_key, e))
                    results.append(
                        (resource_key, e)
                    )  # if validation fails, store the exception
                    continue

                resource_node = get_subnode(
                    resolver, self.__running, resource_path, default=None
                )
                # if not found, resource_node is None
                if resource_node is None:
                    continue
                results.extend(dump_subtree(resource_node))
            return results
        return results

    @metered_subclass_method(METRICS_POOL)
@@ -169,7 +226,7 @@ class IetfL3VpnDriver(_Driver):
                try:
                    resource_value = json.loads(resource_value)

                    service_uuid = resource_value["uuid"]  #! fix based on resources
                    service_uuid = resource_value["uuid"]

                    if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid):
                        exc = NotImplementedError(
+345 −0

File added.

Preview size limit exceeded, changes collapsed.