Loading src/device/service/drivers/ietf_slice/Constants.py 0 → 100644 +25 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, ) SPECIAL_RESOURCE_MAPPINGS = { RESOURCE_ENDPOINTS: "/endpoints", RESOURCE_INTERFACES: "/interfaces", RESOURCE_NETWORK_INSTANCES: "/net-instances", } src/device/service/drivers/ietf_slice/Tools.py 0 → 100644 +147 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Any, Dict, Optional, Tuple import requests from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type from device.service.driver_api._Driver import RESOURCE_ENDPOINTS from .Constants import SPECIAL_RESOURCE_MAPPINGS LOGGER = logging.getLogger(__name__) def process_optional_string_field( endpoint_data: Dict[str, Any], field_name: str, endpoint_resource_value: Dict[str, Any], ) -> None: field_value = chk_attribute( field_name, endpoint_data, "endpoint_data", default=None ) if field_value is None: return chk_string("endpoint_data.{:s}".format(field_name), field_value) if len(field_value) > 0: endpoint_resource_value[field_name] = field_value def compose_resource_endpoint( endpoint_data: Dict[str, Any], ) -> Optional[Tuple[str, Dict]]: try: # Check type of endpoint_data chk_type("endpoint_data", endpoint_data, dict) # Check endpoint UUID (mandatory) endpoint_uuid = chk_attribute("uuid", endpoint_data, "endpoint_data") chk_string("endpoint_data.uuid", endpoint_uuid, min_length=1) endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) endpoint_resource_key = "{:s}/endpoint[{:s}]".format( endpoint_resource_path, endpoint_uuid ) endpoint_resource_value = {"uuid": endpoint_uuid} # Check endpoint optional string fields process_optional_string_field(endpoint_data, "name", endpoint_resource_value) process_optional_string_field( endpoint_data, "site_location", endpoint_resource_value ) process_optional_string_field(endpoint_data, "ce-ip", endpoint_resource_value) process_optional_string_field( endpoint_data, "address_ip", endpoint_resource_value ) process_optional_string_field( endpoint_data, "address_prefix", endpoint_resource_value ) process_optional_string_field(endpoint_data, "mtu", endpoint_resource_value) process_optional_string_field( endpoint_data, "ipv4_lan_prefixes", endpoint_resource_value ) process_optional_string_field(endpoint_data, "type", endpoint_resource_value) process_optional_string_field( endpoint_data, "context_uuid", endpoint_resource_value ) process_optional_string_field( endpoint_data, "topology_uuid", endpoint_resource_value ) # Check endpoint sample types (optional) endpoint_sample_types = chk_attribute( "sample_types", endpoint_data, "endpoint_data", default=[] ) chk_type("endpoint_data.sample_types", endpoint_sample_types, list) sample_types = {} sample_type_errors = [] for i, endpoint_sample_type in enumerate(endpoint_sample_types): field_name = "endpoint_data.sample_types[{:d}]".format(i) try: chk_type(field_name, endpoint_sample_type, (int, str)) if isinstance(endpoint_sample_type, int): metric_name = KpiSampleType.Name(endpoint_sample_type) metric_id = endpoint_sample_type elif isinstance(endpoint_sample_type, str): metric_id = KpiSampleType.Value(endpoint_sample_type) metric_name = endpoint_sample_type else: str_type = str(type(endpoint_sample_type)) raise Exception("Bad format: {:s}".format(str_type)) # pylint: disable=broad-exception-raised except Exception as e: # pylint: disable=broad-exception-caught MSG = "Unsupported {:s}({:s}) : {:s}" sample_type_errors.append( MSG.format(field_name, str(endpoint_sample_type), str(e)) ) metric_name = metric_name.lower().replace("kpisampletype_", "") monitoring_resource_key = "{:s}/state/{:s}".format( endpoint_resource_key, metric_name ) sample_types[metric_id] = monitoring_resource_key if len(sample_type_errors) > 0: # pylint: disable=broad-exception-raised raise Exception( "Malformed Sample Types:\n{:s}".format("\n".join(sample_type_errors)) ) if len(sample_types) > 0: endpoint_resource_value["sample_types"] = sample_types if "site_location" in endpoint_data: endpoint_resource_value["site_location"] = endpoint_data["site_location"] if "ce-ip" in endpoint_data: endpoint_resource_value["ce-ip"] = endpoint_data["ce-ip"] if "address_ip" in endpoint_data: endpoint_resource_value["address_ip"] = endpoint_data["address_ip"] if "address_prefix" in endpoint_data: endpoint_resource_value["address_prefix"] = endpoint_data["address_prefix"] if "mtu" in endpoint_data: endpoint_resource_value["mtu"] = endpoint_data["mtu"] if "ipv4_lan_prefixes" in endpoint_data: endpoint_resource_value["ipv4_lan_prefixes"] = endpoint_data[ "ipv4_lan_prefixes" ] return endpoint_resource_key, endpoint_resource_value except: # pylint: disable=bare-except LOGGER.exception("Problem composing endpoint({:s})".format(str(endpoint_data))) return None src/device/service/drivers/ietf_slice/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. src/device/service/drivers/ietf_slice/driver.py 0 → 100644 +300 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import re import threading from typing import Any, Iterator, List, Optional, Tuple, Union import anytree import requests from requests.auth import HTTPBasicAuth from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_length, chk_string, chk_type from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_SERVICES, _Driver, ) from device.service.driver_api.AnyTreeTools import ( TreeNode, dump_subtree, get_subnode, set_subnode_value, ) from device.service.driver_api.ImportTopologyEnum import ( ImportTopologyEnum, get_import_topology, ) from .Constants import SPECIAL_RESOURCE_MAPPINGS from .tfs_slice_nbi_client import TfsApiClient from .Tools import compose_resource_endpoint LOGGER = logging.getLogger(__name__) ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, RESOURCE_SERVICES, ] RE_IETF_SLICE_DATA = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice$") RE_IETF_SLICE_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice\/operation$") DRIVER_NAME = "ietf_slice" METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME}) class IetfSliceDriver(_Driver): def __init__(self, address: str, port: str, **settings) -> None: super().__init__(DRIVER_NAME, address, int(port), **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__running = TreeNode(".") scheme = self.settings.get("scheme", "http") username = self.settings.get("username") password = self.settings.get("password") self.tac = TfsApiClient( self.address, self.port, scheme=scheme, username=username, password=password, ) self.__auth = None # ( # HTTPBasicAuth(username, password) # if username is not None and password is not None # else None # ) self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format( scheme, self.address, int(self.port) ) self.__timeout = int(self.settings.get("timeout", 120)) self.__import_topology = get_import_topology( self.settings, default=ImportTopologyEnum.DEVICES ) endpoints = self.settings.get("endpoints", []) endpoint_resources = [] for endpoint in endpoints: endpoint_resource = compose_resource_endpoint(endpoint) if endpoint_resource is None: continue endpoint_resources.append(endpoint_resource) self._set_initial_config(endpoint_resources) def _set_initial_config( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: chk_type("resources", resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr="name") with self.__lock: for i, resource in enumerate(resources): str_resource_name = "resources[#{:d}]".format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key, resource_value = resource chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append(e) # if validation fails, store the exception continue try: resource_value = json.loads(resource_value) except: # pylint: disable=bare-except pass set_subnode_value( resolver, self.__running, resource_path, resource_value ) results.append(True) return results def Connect(self) -> bool: url = self.__tfs_nbi_root + "/restconf/data/ietf-network-slice-service:ietf-nss" with self.__lock: if self.__started.is_set(): return True try: # requests.get(url, timeout=self.__timeout) ... except requests.exceptions.Timeout: LOGGER.exception("Timeout connecting {:s}".format(url)) return False except Exception: # pylint: disable=broad-except LOGGER.exception("Exception connecting {:s}".format(url)) return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig( self, resource_keys: List[str] = [] ) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type("resources", resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__running) results = [] resolver = anytree.Resolver(pathattr="name") for i, resource_key in enumerate(resource_keys): str_resource_name = "resource_key[#{:d}]".format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_key = SPECIAL_RESOURCE_MAPPINGS.get( resource_key, resource_key ) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append( (resource_key, e) ) # if validation fails, store the exception continue resource_node = get_subnode( resolver, self.__running, resource_path, default=None ) # if not found, resource_node is None if resource_node is None: continue results.extend(dump_subtree(resource_node)) return results return results @metered_subclass_method(METRICS_POOL) def SetConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: resource_key, resource_value = resource if RE_IETF_SLICE_OPERATION.match(resource_key): operation_type = json.loads(resource_value)["type"] results.append((resource_key, True)) break else: raise Exception("operation type not found in resources") for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource if not RE_IETF_SLICE_DATA.match(resource_key): continue try: resource_value = json.loads(resource_value) slice_name = resource_value["network-slice-services"][ "slice-service" ][0]["id"] if operation_type == "create": self.tac.create_slice(resource_value) elif operation_type == "update": connection_groups = resource_value["network-slice-services"][ "slice-service" ][0]["connection-groups"]["connection-group"] if len(connection_groups) != 1: raise Exception("only one connection group is supported") connection_group = connection_groups[0] self.tac.update_slice( slice_name, connection_group["id"], connection_group ) elif operation_type == "delete": self.tac.delete_slice(slice_name) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except raise e LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource try: # resource_value = json.loads(resource_value) # service_uuid = resource_value["uuid"] # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid): # self.tac.delete_slice(service_uuid) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate: Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # TODO: IETF Slice does not support monitoring by now return [] src/device/service/drivers/ietf_slice/tfs_slice_nbi_client.py 0 → 100644 +73 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Optional import requests from requests.auth import HTTPBasicAuth IETF_SLICE_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-network-slice-service" TIMEOUT = 30 LOGGER = logging.getLogger(__name__) HEADERS = {'Content-Type': 'application/json'} class TfsApiClient: def __init__( self, address: str, port: int, scheme: str = "http", username: Optional[str] = None, password: Optional[str] = None, ) -> None: self._slice_url = IETF_SLICE_URL.format(scheme, address, port) self._auth = None # ( # HTTPBasicAuth(username, password) # if username is not None and password is not None # else None # ) def create_slice(self, slice_data: dict) -> None: url = self._slice_url + ":network-slice-services" try: requests.post(url, json=slice_data, headers=HEADERS) except requests.exceptions.ConnectionError: raise Exception("faild to send post request to TFS IETF Slice NBI") def update_slice( self, slice_name: str, connection_group_id: str, updated_connection_group_data: dict, ) -> None: url = ( self._slice_url + f":network-slice-services/slice-service={slice_name}/connection-groups/connection-group={connection_group_id}" ) try: requests.put(url, json=updated_connection_group_data, headers=HEADERS) except requests.exceptions.ConnectionError: raise Exception("faild to send update request to TFS IETF Slice NBI") def delete_slice(self, slice_name: str) -> None: url = self._slice_url + f":network-slice-services/slice-service={slice_name}" try: requests.delete(url) except requests.exceptions.ConnectionError: raise Exception("faild to send delete request to TFS IETF Slice NBI") Loading
src/device/service/drivers/ietf_slice/Constants.py 0 → 100644 +25 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, ) SPECIAL_RESOURCE_MAPPINGS = { RESOURCE_ENDPOINTS: "/endpoints", RESOURCE_INTERFACES: "/interfaces", RESOURCE_NETWORK_INSTANCES: "/net-instances", }
src/device/service/drivers/ietf_slice/Tools.py 0 → 100644 +147 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Any, Dict, Optional, Tuple import requests from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type from device.service.driver_api._Driver import RESOURCE_ENDPOINTS from .Constants import SPECIAL_RESOURCE_MAPPINGS LOGGER = logging.getLogger(__name__) def process_optional_string_field( endpoint_data: Dict[str, Any], field_name: str, endpoint_resource_value: Dict[str, Any], ) -> None: field_value = chk_attribute( field_name, endpoint_data, "endpoint_data", default=None ) if field_value is None: return chk_string("endpoint_data.{:s}".format(field_name), field_value) if len(field_value) > 0: endpoint_resource_value[field_name] = field_value def compose_resource_endpoint( endpoint_data: Dict[str, Any], ) -> Optional[Tuple[str, Dict]]: try: # Check type of endpoint_data chk_type("endpoint_data", endpoint_data, dict) # Check endpoint UUID (mandatory) endpoint_uuid = chk_attribute("uuid", endpoint_data, "endpoint_data") chk_string("endpoint_data.uuid", endpoint_uuid, min_length=1) endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) endpoint_resource_key = "{:s}/endpoint[{:s}]".format( endpoint_resource_path, endpoint_uuid ) endpoint_resource_value = {"uuid": endpoint_uuid} # Check endpoint optional string fields process_optional_string_field(endpoint_data, "name", endpoint_resource_value) process_optional_string_field( endpoint_data, "site_location", endpoint_resource_value ) process_optional_string_field(endpoint_data, "ce-ip", endpoint_resource_value) process_optional_string_field( endpoint_data, "address_ip", endpoint_resource_value ) process_optional_string_field( endpoint_data, "address_prefix", endpoint_resource_value ) process_optional_string_field(endpoint_data, "mtu", endpoint_resource_value) process_optional_string_field( endpoint_data, "ipv4_lan_prefixes", endpoint_resource_value ) process_optional_string_field(endpoint_data, "type", endpoint_resource_value) process_optional_string_field( endpoint_data, "context_uuid", endpoint_resource_value ) process_optional_string_field( endpoint_data, "topology_uuid", endpoint_resource_value ) # Check endpoint sample types (optional) endpoint_sample_types = chk_attribute( "sample_types", endpoint_data, "endpoint_data", default=[] ) chk_type("endpoint_data.sample_types", endpoint_sample_types, list) sample_types = {} sample_type_errors = [] for i, endpoint_sample_type in enumerate(endpoint_sample_types): field_name = "endpoint_data.sample_types[{:d}]".format(i) try: chk_type(field_name, endpoint_sample_type, (int, str)) if isinstance(endpoint_sample_type, int): metric_name = KpiSampleType.Name(endpoint_sample_type) metric_id = endpoint_sample_type elif isinstance(endpoint_sample_type, str): metric_id = KpiSampleType.Value(endpoint_sample_type) metric_name = endpoint_sample_type else: str_type = str(type(endpoint_sample_type)) raise Exception("Bad format: {:s}".format(str_type)) # pylint: disable=broad-exception-raised except Exception as e: # pylint: disable=broad-exception-caught MSG = "Unsupported {:s}({:s}) : {:s}" sample_type_errors.append( MSG.format(field_name, str(endpoint_sample_type), str(e)) ) metric_name = metric_name.lower().replace("kpisampletype_", "") monitoring_resource_key = "{:s}/state/{:s}".format( endpoint_resource_key, metric_name ) sample_types[metric_id] = monitoring_resource_key if len(sample_type_errors) > 0: # pylint: disable=broad-exception-raised raise Exception( "Malformed Sample Types:\n{:s}".format("\n".join(sample_type_errors)) ) if len(sample_types) > 0: endpoint_resource_value["sample_types"] = sample_types if "site_location" in endpoint_data: endpoint_resource_value["site_location"] = endpoint_data["site_location"] if "ce-ip" in endpoint_data: endpoint_resource_value["ce-ip"] = endpoint_data["ce-ip"] if "address_ip" in endpoint_data: endpoint_resource_value["address_ip"] = endpoint_data["address_ip"] if "address_prefix" in endpoint_data: endpoint_resource_value["address_prefix"] = endpoint_data["address_prefix"] if "mtu" in endpoint_data: endpoint_resource_value["mtu"] = endpoint_data["mtu"] if "ipv4_lan_prefixes" in endpoint_data: endpoint_resource_value["ipv4_lan_prefixes"] = endpoint_data[ "ipv4_lan_prefixes" ] return endpoint_resource_key, endpoint_resource_value except: # pylint: disable=bare-except LOGGER.exception("Problem composing endpoint({:s})".format(str(endpoint_data))) return None
src/device/service/drivers/ietf_slice/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
src/device/service/drivers/ietf_slice/driver.py 0 → 100644 +300 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import re import threading from typing import Any, Iterator, List, Optional, Tuple, Union import anytree import requests from requests.auth import HTTPBasicAuth from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_length, chk_string, chk_type from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_SERVICES, _Driver, ) from device.service.driver_api.AnyTreeTools import ( TreeNode, dump_subtree, get_subnode, set_subnode_value, ) from device.service.driver_api.ImportTopologyEnum import ( ImportTopologyEnum, get_import_topology, ) from .Constants import SPECIAL_RESOURCE_MAPPINGS from .tfs_slice_nbi_client import TfsApiClient from .Tools import compose_resource_endpoint LOGGER = logging.getLogger(__name__) ALL_RESOURCE_KEYS = [ RESOURCE_ENDPOINTS, RESOURCE_SERVICES, ] RE_IETF_SLICE_DATA = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice$") RE_IETF_SLICE_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/IETFSlice\/operation$") DRIVER_NAME = "ietf_slice" METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME}) class IetfSliceDriver(_Driver): def __init__(self, address: str, port: str, **settings) -> None: super().__init__(DRIVER_NAME, address, int(port), **settings) self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__running = TreeNode(".") scheme = self.settings.get("scheme", "http") username = self.settings.get("username") password = self.settings.get("password") self.tac = TfsApiClient( self.address, self.port, scheme=scheme, username=username, password=password, ) self.__auth = None # ( # HTTPBasicAuth(username, password) # if username is not None and password is not None # else None # ) self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format( scheme, self.address, int(self.port) ) self.__timeout = int(self.settings.get("timeout", 120)) self.__import_topology = get_import_topology( self.settings, default=ImportTopologyEnum.DEVICES ) endpoints = self.settings.get("endpoints", []) endpoint_resources = [] for endpoint in endpoints: endpoint_resource = compose_resource_endpoint(endpoint) if endpoint_resource is None: continue endpoint_resources.append(endpoint_resource) self._set_initial_config(endpoint_resources) def _set_initial_config( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: chk_type("resources", resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr="name") with self.__lock: for i, resource in enumerate(resources): str_resource_name = "resources[#{:d}]".format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key, resource_value = resource chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append(e) # if validation fails, store the exception continue try: resource_value = json.loads(resource_value) except: # pylint: disable=bare-except pass set_subnode_value( resolver, self.__running, resource_path, resource_value ) results.append(True) return results def Connect(self) -> bool: url = self.__tfs_nbi_root + "/restconf/data/ietf-network-slice-service:ietf-nss" with self.__lock: if self.__started.is_set(): return True try: # requests.get(url, timeout=self.__timeout) ... except requests.exceptions.Timeout: LOGGER.exception("Timeout connecting {:s}".format(url)) return False except Exception: # pylint: disable=broad-except LOGGER.exception("Exception connecting {:s}".format(url)) return False else: self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: self.__terminate.set() return True @metered_subclass_method(METRICS_POOL) def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] @metered_subclass_method(METRICS_POOL) def GetConfig( self, resource_keys: List[str] = [] ) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type("resources", resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__running) results = [] resolver = anytree.Resolver(pathattr="name") for i, resource_key in enumerate(resource_keys): str_resource_name = "resource_key[#{:d}]".format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_key = SPECIAL_RESOURCE_MAPPINGS.get( resource_key, resource_key ) resource_path = resource_key.split("/") except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Exception validating {:s}: {:s}".format( str_resource_name, str(resource_key) ) ) results.append( (resource_key, e) ) # if validation fails, store the exception continue resource_node = get_subnode( resolver, self.__running, resource_path, default=None ) # if not found, resource_node is None if resource_node is None: continue results.extend(dump_subtree(resource_node)) return results return results @metered_subclass_method(METRICS_POOL) def SetConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: resource_key, resource_value = resource if RE_IETF_SLICE_OPERATION.match(resource_key): operation_type = json.loads(resource_value)["type"] results.append((resource_key, True)) break else: raise Exception("operation type not found in resources") for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource if not RE_IETF_SLICE_DATA.match(resource_key): continue try: resource_value = json.loads(resource_value) slice_name = resource_value["network-slice-services"][ "slice-service" ][0]["id"] if operation_type == "create": self.tac.create_slice(resource_value) elif operation_type == "update": connection_groups = resource_value["network-slice-services"][ "slice-service" ][0]["connection-groups"]["connection-group"] if len(connection_groups) != 1: raise Exception("only one connection group is supported") connection_group = connection_groups[0] self.tac.update_slice( slice_name, connection_group["id"], connection_group ) elif operation_type == "delete": self.tac.delete_slice(slice_name) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except raise e LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig( self, resources: List[Tuple[str, Any]] ) -> List[Union[bool, Exception]]: results = [] if len(resources) == 0: return results with self.__lock: for resource in resources: LOGGER.info("resource = {:s}".format(str(resource))) resource_key, resource_value = resource try: # resource_value = json.loads(resource_value) # service_uuid = resource_value["uuid"] # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid): # self.tac.delete_slice(service_uuid) results.append((resource_key, True)) except Exception as e: # pylint: disable=broad-except LOGGER.exception( "Unhandled error processing resource_key({:s})".format( str(resource_key) ) ) results.append((resource_key, e)) return results @metered_subclass_method(METRICS_POOL) def SubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState( self, subscriptions: List[Tuple[str, float, float]] ) -> List[Union[bool, Exception]]: # TODO: IETF Slice does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate: Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: # TODO: IETF Slice does not support monitoring by now return []
src/device/service/drivers/ietf_slice/tfs_slice_nbi_client.py 0 → 100644 +73 −0 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Optional import requests from requests.auth import HTTPBasicAuth IETF_SLICE_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-network-slice-service" TIMEOUT = 30 LOGGER = logging.getLogger(__name__) HEADERS = {'Content-Type': 'application/json'} class TfsApiClient: def __init__( self, address: str, port: int, scheme: str = "http", username: Optional[str] = None, password: Optional[str] = None, ) -> None: self._slice_url = IETF_SLICE_URL.format(scheme, address, port) self._auth = None # ( # HTTPBasicAuth(username, password) # if username is not None and password is not None # else None # ) def create_slice(self, slice_data: dict) -> None: url = self._slice_url + ":network-slice-services" try: requests.post(url, json=slice_data, headers=HEADERS) except requests.exceptions.ConnectionError: raise Exception("faild to send post request to TFS IETF Slice NBI") def update_slice( self, slice_name: str, connection_group_id: str, updated_connection_group_data: dict, ) -> None: url = ( self._slice_url + f":network-slice-services/slice-service={slice_name}/connection-groups/connection-group={connection_group_id}" ) try: requests.put(url, json=updated_connection_group_data, headers=HEADERS) except requests.exceptions.ConnectionError: raise Exception("faild to send update request to TFS IETF Slice NBI") def delete_slice(self, slice_name: str) -> None: url = self._slice_url + f":network-slice-services/slice-service={slice_name}" try: requests.delete(url) except requests.exceptions.ConnectionError: raise Exception("faild to send delete request to TFS IETF Slice NBI")