# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import re import threading from typing import Any, Iterator, List, Optional, Tuple, Union import anytree import requests from requests.auth import HTTPBasicAuth from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_length, chk_string, chk_type from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_SERVICES, _Driver, ) from device.service.driver_api.AnyTreeTools import ( TreeNode, dump_subtree, get_subnode, set_subnode_value, ) from device.service.driver_api.ImportTopologyEnum import ( ImportTopologyEnum, get_import_topology, ) from .Constants import SPECIAL_RESOURCE_MAPPINGS from .tfs_slice_nbi_client import TfsApiClient from .Tools import compose_resource_endpoint LOGGER = logging.getLogger(__name__) ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, RESOURCE_SERVICES, ] RE_IETF_SLICE_DATA = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice$") RE_IETF_SLICE_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice\/operation$") DRIVER_NAME = "ietf_slice" METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME}) class IetfSliceDriver(_Driver): def __init__(self, address: str, port: str, **settings) -> None: super().__init__(DRIVER_NAME, address, int(port), **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__running = TreeNode(".") scheme = self.settings.get("scheme", "http") username = self.settings.get("username") password = self.settings.get("password") self.tac = TfsApiClient( self.address, self.port, scheme=scheme, username=username, password=password, ) self.__auth = None # ( # HTTPBasicAuth(username, password) # if username is not None and password is not None # else None # ) self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format( scheme, self.address, int(self.port) ) self.__timeout = int(self.settings.get("timeout", 120)) self.__import_topology = get_import_topology( self.settings, default=ImportTopologyEnum.DEVICES ) endpoints = self.settings.get("endpoints", []) endpoint_resources = [] for endpoint in endpoints: endpoint_resource = compose_resource_endpoint(endpoint) if endpoint_resource is None: continue endpoint_resources.append(endpoint_resource) self._set_initial_config(endpoint_resources) def _set_initial_config( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: chk_type("resources", resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr="name") with self.__lock: for i, resource in enumerate(resources): str_resource_name = "resources[#{:d}]".format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key, resource_value = resource chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append(e) # if validation fails, store the exception continue try: resource_value = json.loads(resource_value) except: # pylint: disable=bare-except pass set_subnode_value( resolver, self.__running, resource_path, resource_value ) results.append(True) return results def Connect(self) -> bool: url = self.__tfs_nbi_root + "/restconf/data/ietf-network-slice-service:ietf-nss" with self.__lock: if self.__started.is_set(): return True try: # requests.get(url, timeout=self.__timeout) ... except requests.exceptions.Timeout: LOGGER.exception("Timeout connecting {:s}".format(url)) return False except Exception: # pylint: disable=broad-except LOGGER.exception("Exception connecting {:s}".format(url)) return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig( self, resource_keys: List[str] = [] ) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type("resources", resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__running) results = [] resolver = anytree.Resolver(pathattr="name") for i, resource_key in enumerate(resource_keys): str_resource_name = "resource_key[#{:d}]".format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_key = SPECIAL_RESOURCE_MAPPINGS.get( resource_key, resource_key ) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append( (resource_key, e) ) # if validation fails, store the exception continue resource_node = get_subnode( resolver, self.__running, resource_path, default=None ) # if not found, resource_node is None if resource_node is None: continue results.extend(dump_subtree(resource_node)) return results return results @metered_subclass_method(METRICS_POOL) def SetConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: resource_key, resource_value = resource if RE_IETF_SLICE_OPERATION.match(resource_key): operation_type = json.loads(resource_value)["type"] results.append((resource_key, True)) break else: raise Exception("operation type not found in resources") for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource if not RE_IETF_SLICE_DATA.match(resource_key): continue try: resource_value = json.loads(resource_value) slice_name = resource_value["network-slice-services"][ "slice-service" ][0]["id"] if operation_type == "create": self.tac.create_slice(resource_value) elif operation_type == "update": connection_groups = resource_value["network-slice-services"][ "slice-service" ][0]["connection-groups"]["connection-group"] if len(connection_groups) != 1: raise Exception("only one connection group is supported") connection_group = connection_groups[0] self.tac.update_slice( slice_name, connection_group["id"], connection_group ) elif operation_type == "delete": self.tac.delete_slice(slice_name) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except raise e LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource try: # resource_value = json.loads(resource_value) # service_uuid = resource_value["uuid"] # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid): # self.tac.delete_slice(service_uuid) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate: Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # TODO: IETF Slice does not support monitoring by now return []