Skip to content
driver.py 12.1 KiB
Newer Older
Shayan Hajipour's avatar
Shayan Hajipour committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
Shayan Hajipour's avatar
Shayan Hajipour committed
import re
Shayan Hajipour's avatar
Shayan Hajipour committed
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union

Shayan Hajipour's avatar
Shayan Hajipour committed
import anytree
Shayan Hajipour's avatar
Shayan Hajipour committed
import requests
from requests.auth import HTTPBasicAuth

from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
Shayan Hajipour's avatar
Shayan Hajipour committed
from common.type_checkers.Checkers import chk_length, chk_string, chk_type
Shayan Hajipour's avatar
Shayan Hajipour committed
from device.service.driver_api._Driver import (
    RESOURCE_ENDPOINTS,
    RESOURCE_SERVICES,
    _Driver,
)
Shayan Hajipour's avatar
Shayan Hajipour committed
from device.service.driver_api.AnyTreeTools import (
    TreeNode,
    dump_subtree,
    get_subnode,
    set_subnode_value,
)
Shayan Hajipour's avatar
Shayan Hajipour committed
from device.service.driver_api.ImportTopologyEnum import (
    ImportTopologyEnum,
    get_import_topology,
)

Shayan Hajipour's avatar
Shayan Hajipour committed
from .Constants import SPECIAL_RESOURCE_MAPPINGS
Shayan Hajipour's avatar
Shayan Hajipour committed
from .TfsApiClient import TfsApiClient
from .Tools import (
Shayan Hajipour's avatar
Shayan Hajipour committed
    compose_resource_endpoint,
Shayan Hajipour's avatar
Shayan Hajipour committed
    create_l3vpn_datamodel,
    service_exists,
)

LOGGER = logging.getLogger(__name__)


ALL_RESOURCE_KEYS = [
    RESOURCE_ENDPOINTS,
    RESOURCE_SERVICES,
]

Shayan Hajipour's avatar
Shayan Hajipour committed
RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r"^\/interface\[([^\]]+)\].*")

Shayan Hajipour's avatar
Shayan Hajipour committed
RE_IETF_L3VPN_DATA = re.compile(r"^\/service\[[^\]]+\]\/IETFL3VPN$")
RE_IETF_L3VPN_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/IETFL3VPN\/operation$")

Shayan Hajipour's avatar
Shayan Hajipour committed
DRIVER_NAME = "ietf_l3vpn"
METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME})


class IetfL3VpnDriver(_Driver):
    def __init__(self, address: str, port: str, **settings) -> None:
        super().__init__(DRIVER_NAME, address, int(port), **settings)
Shayan Hajipour's avatar
Shayan Hajipour committed
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
Shayan Hajipour's avatar
Shayan Hajipour committed
        self.__running = TreeNode(".")
Shayan Hajipour's avatar
Shayan Hajipour committed
        scheme = self.settings.get("scheme", "http")
        username = self.settings.get("username")
        password = self.settings.get("password")
        self.tac = TfsApiClient(
            self.address,
Shayan Hajipour's avatar
Shayan Hajipour committed
            self.port,
Shayan Hajipour's avatar
Shayan Hajipour committed
            scheme=scheme,
            username=username,
            password=password,
        )
        self.__auth = None
        # (
        #     HTTPBasicAuth(username, password)
        #     if username is not None and password is not None
        #     else None
        # )
Shayan Hajipour's avatar
Shayan Hajipour committed
        self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format(
            scheme, self.address, int(self.port)
        )
        self.__timeout = int(self.settings.get("timeout", 120))
        self.__import_topology = get_import_topology(
            self.settings, default=ImportTopologyEnum.DEVICES
        )
Shayan Hajipour's avatar
Shayan Hajipour committed
        endpoints = self.settings.get("endpoints", [])
        endpoint_resources = []
        for endpoint in endpoints:
            endpoint_resource = compose_resource_endpoint(endpoint)
            if endpoint_resource is None:
                continue
            endpoint_resources.append(endpoint_resource)
        self._set_initial_config(endpoint_resources)

    def _set_initial_config(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        chk_type("resources", resources, list)
        if len(resources) == 0:
            return []
        results = []
        resolver = anytree.Resolver(pathattr="name")
        with self.__lock:
            for i, resource in enumerate(resources):
                str_resource_name = "resources[#{:d}]".format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, min_length=2, max_length=2)
                    resource_key, resource_value = resource
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split("/")
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
                        "Exception validating {:s}: {:s}".format(
                            str_resource_name, str(resource_key)
                        )
                    )
                    results.append(e)  # if validation fails, store the exception
                    continue

                try:
                    resource_value = json.loads(resource_value)
                except:  # pylint: disable=bare-except
                    pass

                set_subnode_value(
                    resolver, self.__running, resource_path, resource_value
                )

                results.append(True)
        return results
Shayan Hajipour's avatar
Shayan Hajipour committed

    def Connect(self) -> bool:
        url = (
            self.__tfs_nbi_root + "/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
        )
        with self.__lock:
            if self.__started.is_set():
                return True
            try:
Shayan Hajipour's avatar
Shayan Hajipour committed
                # requests.get(url, timeout=self.__timeout, auth=self.__auth)
                ...
Shayan Hajipour's avatar
Shayan Hajipour committed
            except requests.exceptions.Timeout:
                LOGGER.exception("Timeout connecting {:s}".format(url))
Shayan Hajipour's avatar
Shayan Hajipour committed
                return False
            except Exception:  # pylint: disable=broad-except
                LOGGER.exception("Exception connecting {:s}".format(url))
Shayan Hajipour's avatar
Shayan Hajipour committed
                return False
            else:
                self.__started.set()
                return True

    def Disconnect(self) -> bool:
        with self.__lock:
            self.__terminate.set()
            return True

    @metered_subclass_method(METRICS_POOL)
    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(
        self, resource_keys: List[str] = []
    ) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type("resources", resource_keys, list)
        with self.__lock:
            if len(resource_keys) == 0:
Shayan Hajipour's avatar
Shayan Hajipour committed
                return dump_subtree(self.__running)
            results = []
            resolver = anytree.Resolver(pathattr="name")
Shayan Hajipour's avatar
Shayan Hajipour committed
            for i, resource_key in enumerate(resource_keys):
                str_resource_name = "resource_key[#{:d}]".format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
Shayan Hajipour's avatar
Shayan Hajipour committed
                    resource_key = SPECIAL_RESOURCE_MAPPINGS.get(
                        resource_key, resource_key
                    )
                    resource_path = resource_key.split("/")
Shayan Hajipour's avatar
Shayan Hajipour committed
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
Shayan Hajipour's avatar
Shayan Hajipour committed
                        "Exception validating {:s}: {:s}".format(
                            str_resource_name, str(resource_key)
Shayan Hajipour's avatar
Shayan Hajipour committed
                    results.append(
                        (resource_key, e)
                    )  # if validation fails, store the exception
                    continue

                resource_node = get_subnode(
                    resolver, self.__running, resource_path, default=None
                )
                # if not found, resource_node is None
                if resource_node is None:
                    continue
                results.extend(dump_subtree(resource_node))
            return results
Shayan Hajipour's avatar
Shayan Hajipour committed
        return results

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        results = []
        if len(resources) == 0:
            return results
        with self.__lock:
Shayan Hajipour's avatar
Shayan Hajipour committed
            for resource in resources:
                resource_key, resource_value = resource
                if RE_IETF_L3VPN_OPERATION.match(resource_key):
                    operation_type = json.loads(resource_value)["type"]
                    results.append((resource_key, True))
                    break
            else:
                raise Exception("operation type not found in resources")
Shayan Hajipour's avatar
Shayan Hajipour committed
            for resource in resources:
                LOGGER.info("resource = {:s}".format(str(resource)))
                resource_key, resource_value = resource
Shayan Hajipour's avatar
Shayan Hajipour committed
                if not RE_IETF_L3VPN_DATA.match(resource_key):
                    continue
Shayan Hajipour's avatar
Shayan Hajipour committed
                try:
                    resource_value = json.loads(resource_value)


Shayan Hajipour's avatar
Shayan Hajipour committed
                    # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid):
                    #     exc = NotImplementedError(
                    #         "IETF L3VPN Service Update is still not supported"
                    #     )
                    #     results.append((resource[0], exc))
                    #     continue
                    if operation_type == "create":
                        service_id = resource_value["network-slice-services"]["slice-service"][0]["id"]
                        l3vpn_datamodel = create_l3vpn_datamodel(
                        service_id, resource_value, operation_type
Shayan Hajipour's avatar
Shayan Hajipour committed
                        self.tac.create_connectivity_service(l3vpn_datamodel)
                    elif operation_type == "update":
                        service_id = resource_value["network-slice-services"]["slice-service"][0]["id"]
                        l3vpn_datamodel = create_l3vpn_datamodel(
                        service_id, resource_value, operation_type
                    )
                        self.tac.update_connectivity_service(l3vpn_datamodel)
                    else:
                        raise Exception("operation type not supported")
Shayan Hajipour's avatar
Shayan Hajipour committed
                    results.append((resource_key, True))
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
                        "Unhandled error processing resource_key({:s})".format(
                            str(resource_key)
                        )
                    )
                    results.append((resource_key, e))
        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(
        self, resources: List[Tuple[str, Any]]
    ) -> List[Union[bool, Exception]]:
        results = []
        if len(resources) == 0:
            return results
        with self.__lock:
            for resource in resources:
                LOGGER.info("resource = {:s}".format(str(resource)))
                resource_key, resource_value = resource
Shayan Hajipour's avatar
Shayan Hajipour committed
                if not RE_IETF_L3VPN_DATA.match(resource_key):
                    continue
Shayan Hajipour's avatar
Shayan Hajipour committed
                try:
                    resource_value = json.loads(resource_value)
Shayan Hajipour's avatar
Shayan Hajipour committed
                    service_id = resource_value["id"]
Shayan Hajipour's avatar
Shayan Hajipour committed
                    # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid):
                    self.tac.delete_connectivity_service(service_id)
Shayan Hajipour's avatar
Shayan Hajipour committed
                    results.append((resource_key, True))
                except Exception as e:  # pylint: disable=broad-except
                    LOGGER.exception(
                        "Unhandled error processing resource_key({:s})".format(
                            str(resource_key)
                        )
                    )
                    results.append((resource_key, e))
        return results

    @metered_subclass_method(METRICS_POOL)
    def SubscribeState(
        self, subscriptions: List[Tuple[str, float, float]]
    ) -> List[Union[bool, Exception]]:
        # TODO: IETF L3VPN does not support monitoring by now
        return [False for _ in subscriptions]

    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeState(
        self, subscriptions: List[Tuple[str, float, float]]
    ) -> List[Union[bool, Exception]]:
        # TODO: IETF L3VPN does not support monitoring by now
        return [False for _ in subscriptions]

    def GetState(
        self, blocking=False, terminate: Optional[threading.Event] = None
    ) -> Iterator[Tuple[float, str, Any]]:
        # TODO: IETF L3VPN does not support monitoring by now
        return []