diff --git a/src/device/service/drivers/nce/Constants.py b/src/device/service/drivers/nce/Constants.py new file mode 100644 index 0000000000000000000000000000000000000000..df66eb16b3d78c1b388a086011ed6f6b75b8099f --- /dev/null +++ b/src/device/service/drivers/nce/Constants.py @@ -0,0 +1,25 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from device.service.driver_api._Driver import ( + RESOURCE_ENDPOINTS, + RESOURCE_INTERFACES, + RESOURCE_NETWORK_INSTANCES, +) + +SPECIAL_RESOURCE_MAPPINGS = { + RESOURCE_ENDPOINTS: "/endpoints", + RESOURCE_INTERFACES: "/interfaces", + RESOURCE_NETWORK_INSTANCES: "/net-instances", +} diff --git a/src/device/service/drivers/nce/Tools.py b/src/device/service/drivers/nce/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..ba54beb360f25498497356b63015b7ed603e7a04 --- /dev/null +++ b/src/device/service/drivers/nce/Tools.py @@ -0,0 +1,145 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import Any, Dict, Optional, Tuple + +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS + +from .Constants import SPECIAL_RESOURCE_MAPPINGS + +LOGGER = logging.getLogger(__name__) + + +def process_optional_string_field( + endpoint_data: Dict[str, Any], + field_name: str, + endpoint_resource_value: Dict[str, Any], +) -> None: + field_value = chk_attribute( + field_name, endpoint_data, "endpoint_data", default=None + ) + if field_value is None: + return + chk_string("endpoint_data.{:s}".format(field_name), field_value) + if len(field_value) > 0: + endpoint_resource_value[field_name] = field_value + + +def compose_resource_endpoint( + endpoint_data: Dict[str, Any], +) -> Optional[Tuple[str, Dict]]: + try: + # Check type of endpoint_data + chk_type("endpoint_data", endpoint_data, dict) + + # Check endpoint UUID (mandatory) + endpoint_uuid = chk_attribute("uuid", endpoint_data, "endpoint_data") + chk_string("endpoint_data.uuid", endpoint_uuid, min_length=1) + endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) + endpoint_resource_key = "{:s}/endpoint[{:s}]".format( + endpoint_resource_path, endpoint_uuid + ) + endpoint_resource_value = {"uuid": endpoint_uuid} + + # Check endpoint optional string fields + process_optional_string_field(endpoint_data, "name", endpoint_resource_value) + process_optional_string_field( + endpoint_data, "site_location", endpoint_resource_value + ) + process_optional_string_field(endpoint_data, "ce-ip", endpoint_resource_value) + process_optional_string_field( + endpoint_data, "address_ip", endpoint_resource_value + ) + process_optional_string_field( + endpoint_data, "address_prefix", endpoint_resource_value + ) + process_optional_string_field(endpoint_data, "mtu", endpoint_resource_value) + process_optional_string_field( + endpoint_data, "ipv4_lan_prefixes", endpoint_resource_value + ) + process_optional_string_field(endpoint_data, "type", endpoint_resource_value) + process_optional_string_field( + endpoint_data, "context_uuid", endpoint_resource_value + ) + process_optional_string_field( + endpoint_data, "topology_uuid", endpoint_resource_value + ) + + # Check endpoint sample types (optional) + endpoint_sample_types = chk_attribute( + "sample_types", endpoint_data, "endpoint_data", default=[] + ) + chk_type("endpoint_data.sample_types", endpoint_sample_types, list) + sample_types = {} + sample_type_errors = [] + for i, endpoint_sample_type in enumerate(endpoint_sample_types): + field_name = "endpoint_data.sample_types[{:d}]".format(i) + try: + chk_type(field_name, endpoint_sample_type, (int, str)) + if isinstance(endpoint_sample_type, int): + metric_name = KpiSampleType.Name(endpoint_sample_type) + metric_id = endpoint_sample_type + elif isinstance(endpoint_sample_type, str): + metric_id = KpiSampleType.Value(endpoint_sample_type) + metric_name = endpoint_sample_type + else: + str_type = str(type(endpoint_sample_type)) + raise Exception("Bad format: {:s}".format(str_type)) # pylint: disable=broad-exception-raised + except Exception as e: # pylint: disable=broad-exception-caught + MSG = "Unsupported {:s}({:s}) : {:s}" + sample_type_errors.append( + MSG.format(field_name, str(endpoint_sample_type), str(e)) + ) + + metric_name = metric_name.lower().replace("kpisampletype_", "") + monitoring_resource_key = "{:s}/state/{:s}".format( + endpoint_resource_key, metric_name + ) + sample_types[metric_id] = monitoring_resource_key + + if len(sample_type_errors) > 0: + # pylint: disable=broad-exception-raised + raise Exception( + "Malformed Sample Types:\n{:s}".format("\n".join(sample_type_errors)) + ) + + if len(sample_types) > 0: + endpoint_resource_value["sample_types"] = sample_types + + if "site_location" in endpoint_data: + endpoint_resource_value["site_location"] = endpoint_data["site_location"] + + if "ce-ip" in endpoint_data: + endpoint_resource_value["ce-ip"] = endpoint_data["ce-ip"] + + if "address_ip" in endpoint_data: + endpoint_resource_value["address_ip"] = endpoint_data["address_ip"] + + if "address_prefix" in endpoint_data: + endpoint_resource_value["address_prefix"] = endpoint_data["address_prefix"] + + if "mtu" in endpoint_data: + endpoint_resource_value["mtu"] = endpoint_data["mtu"] + + if "ipv4_lan_prefixes" in endpoint_data: + endpoint_resource_value["ipv4_lan_prefixes"] = endpoint_data[ + "ipv4_lan_prefixes" + ] + + return endpoint_resource_key, endpoint_resource_value + except: # pylint: disable=bare-except + LOGGER.exception("Problem composing endpoint({:s})".format(str(endpoint_data))) + return None diff --git a/src/device/service/drivers/nce/__init__.py b/src/device/service/drivers/nce/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/device/service/drivers/nce/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/device/service/drivers/nce/driver.py b/src/device/service/drivers/nce/driver.py new file mode 100644 index 0000000000000000000000000000000000000000..003a0fabfdf384da61258159490b30929f46b3e4 --- /dev/null +++ b/src/device/service/drivers/nce/driver.py @@ -0,0 +1,279 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import re +import threading +from typing import Any, Iterator, List, Optional, Tuple, Union + +import anytree +import requests +from requests.auth import HTTPBasicAuth + +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from common.type_checkers.Checkers import chk_length, chk_string, chk_type +from device.service.driver_api._Driver import _Driver +from device.service.driver_api.AnyTreeTools import ( + TreeNode, + dump_subtree, + get_subnode, + set_subnode_value, +) +from device.service.driver_api.ImportTopologyEnum import ( + ImportTopologyEnum, + get_import_topology, +) + +from .Constants import SPECIAL_RESOURCE_MAPPINGS +from .nce_fan_client import NCEClient +from .Tools import compose_resource_endpoint + +LOGGER = logging.getLogger(__name__) + + +RE_NCE_APP_FLOW_DATA = re.compile(r"^\/service\[[^\]]+\]\/AppFlow$") +RE_NCE_APP_FLOW_OPERATION = re.compile(r"^\/service\[[^\]]+\]\/AppFlow\/operation$") + +DRIVER_NAME = "nce" +METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME}) + + +class NCEDriver(_Driver): + def __init__(self, address: str, port: str, **settings) -> None: + super().__init__(DRIVER_NAME, address, int(port), **settings) + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + self.__running = TreeNode(".") + scheme = self.settings.get("scheme", "http") + username = self.settings.get("username") + password = self.settings.get("password") + self.nce = NCEClient( + self.address, + self.port, + scheme=scheme, + username=username, + password=password, + ) + self.__auth = None + # ( + # HTTPBasicAuth(username, password) + # if username is not None and password is not None + # else None + # ) + self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format( + scheme, self.address, int(self.port) + ) + self.__timeout = int(self.settings.get("timeout", 120)) + self.__import_topology = get_import_topology( + self.settings, default=ImportTopologyEnum.DEVICES + ) + endpoints = self.settings.get("endpoints", []) + endpoint_resources = [] + for endpoint in endpoints: + endpoint_resource = compose_resource_endpoint(endpoint) + if endpoint_resource is None: + continue + endpoint_resources.append(endpoint_resource) + self._set_initial_config(endpoint_resources) + + def _set_initial_config( + self, resources: List[Tuple[str, Any]] + ) -> List[Union[bool, Exception]]: + chk_type("resources", resources, list) + if len(resources) == 0: + return [] + results = [] + resolver = anytree.Resolver(pathattr="name") + with self.__lock: + for i, resource in enumerate(resources): + str_resource_name = "resources[#{:d}]".format(i) + try: + chk_type(str_resource_name, resource, (list, tuple)) + chk_length(str_resource_name, resource, min_length=2, max_length=2) + resource_key, resource_value = resource + chk_string(str_resource_name, resource_key, allow_empty=False) + resource_path = resource_key.split("/") + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Exception validating {:s}: {:s}".format( + str_resource_name, str(resource_key) + ) + ) + results.append(e) # if validation fails, store the exception + continue + + try: + resource_value = json.loads(resource_value) + except: # pylint: disable=bare-except + pass + + set_subnode_value( + resolver, self.__running, resource_path, resource_value + ) + + results.append(True) + return results + + def Connect(self) -> bool: + with self.__lock: + if self.__started.is_set(): + return True + try: + ... + except requests.exceptions.Timeout: + return False + except Exception: # pylint: disable=broad-except + return False + else: + self.__started.set() + return True + + def Disconnect(self) -> bool: + with self.__lock: + self.__terminate.set() + return True + + @metered_subclass_method(METRICS_POOL) + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + with self.__lock: + return [] + + @metered_subclass_method(METRICS_POOL) + def GetConfig( + self, resource_keys: List[str] = [] + ) -> List[Tuple[str, Union[Any, None, Exception]]]: + chk_type("resources", resource_keys, list) + with self.__lock: + if len(resource_keys) == 0: + return dump_subtree(self.__running) + results = [] + resolver = anytree.Resolver(pathattr="name") + for i, resource_key in enumerate(resource_keys): + str_resource_name = "resource_key[#{:d}]".format(i) + try: + chk_string(str_resource_name, resource_key, allow_empty=False) + resource_key = SPECIAL_RESOURCE_MAPPINGS.get( + resource_key, resource_key + ) + resource_path = resource_key.split("/") + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Exception validating {:s}: {:s}".format( + str_resource_name, str(resource_key) + ) + ) + results.append( + (resource_key, e) + ) # if validation fails, store the exception + continue + + resource_node = get_subnode( + resolver, self.__running, resource_path, default=None + ) + # if not found, resource_node is None + if resource_node is None: + continue + results.extend(dump_subtree(resource_node)) + return results + return results + + @metered_subclass_method(METRICS_POOL) + def SetConfig( + self, resources: List[Tuple[str, Any]] + ) -> List[Union[bool, Exception]]: + results = [] + if len(resources) == 0: + return results + with self.__lock: + for resource in resources: + resource_key, resource_value = resource + LOGGER.debug("resource = {:s}".format(str(resource))) + if RE_NCE_APP_FLOW_OPERATION.match(resource_key): + operation_type = json.loads(resource_value)["type"] + results.append((resource_key, True)) + break + else: + raise Exception("operation type not found in resources") + for resource in resources: + LOGGER.info("resource = {:s}".format(str(resource))) + resource_key, resource_value = resource + if not RE_NCE_APP_FLOW_DATA.match(resource_key): + continue + try: + resource_value = json.loads(resource_value) + if operation_type == "create": + self.nce.create_app_flow(resource_value) + elif operation_type == "delete": + app_flow_name = resource_value["huawei-nce-app-flow:app-flows"][ + "app-flow" + ][0]["app-name"] + self.nce.delete_app_flow(app_flow_name) + LOGGER.debug(f"app_flow_datamodel {resource_value}") + results.append((resource_key, True)) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Unhandled error processing resource_key({:s})".format( + str(resource_key) + ) + ) + results.append((resource_key, e)) + return results + + @metered_subclass_method(METRICS_POOL) + def DeleteConfig( + self, resources: List[Tuple[str, Any]] + ) -> List[Union[bool, Exception]]: + results = [] + if len(resources) == 0: + return results + with self.__lock: + for resource in resources: + LOGGER.info("resource = {:s}".format(str(resource))) + resource_key, resource_value = resource + try: + # resource_value = json.loads(resource_value) + # service_uuid = resource_value["uuid"] + # if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid): + # self.tac.delete_connectivity_service(service_uuid) + results.append((resource_key, True)) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception( + "Unhandled error processing resource_key({:s})".format( + str(resource_key) + ) + ) + results.append((resource_key, e)) + return results + + @metered_subclass_method(METRICS_POOL) + def SubscribeState( + self, subscriptions: List[Tuple[str, float, float]] + ) -> List[Union[bool, Exception]]: + # TODO: IETF L3VPN does not support monitoring by now + return [False for _ in subscriptions] + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeState( + self, subscriptions: List[Tuple[str, float, float]] + ) -> List[Union[bool, Exception]]: + # TODO: IETF L3VPN does not support monitoring by now + return [False for _ in subscriptions] + + def GetState( + self, blocking=False, terminate: Optional[threading.Event] = None + ) -> Iterator[Tuple[float, str, Any]]: + # TODO: IETF L3VPN does not support monitoring by now + return [] diff --git a/src/device/service/drivers/nce/nce_fan_client.py b/src/device/service/drivers/nce/nce_fan_client.py new file mode 100644 index 0000000000000000000000000000000000000000..022ae15a5d93e2b29e3a4c832527b8c242f90462 --- /dev/null +++ b/src/device/service/drivers/nce/nce_fan_client.py @@ -0,0 +1,92 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Optional + +import requests +from requests.auth import HTTPBasicAuth + +LOGGER = logging.getLogger(__name__) + +NCE_FAN_URL = "{:s}://{:s}:{:d}/restconf/v1/data" +TIMEOUT = 30 + +HTTP_OK_CODES = { + 200, # OK + 201, # Created + 202, # Accepted + 204, # No Content +} + +MAPPING_STATUS = { + "DEVICEOPERATIONALSTATUS_UNDEFINED": 0, + "DEVICEOPERATIONALSTATUS_DISABLED": 1, + "DEVICEOPERATIONALSTATUS_ENABLED": 2, +} + +MAPPING_DRIVER = { + "DEVICEDRIVER_UNDEFINED": 0, + "DEVICEDRIVER_OPENCONFIG": 1, + "DEVICEDRIVER_TRANSPORT_API": 2, + "DEVICEDRIVER_P4": 3, + "DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4, + "DEVICEDRIVER_ONF_TR_532": 5, + "DEVICEDRIVER_XR": 6, + "DEVICEDRIVER_IETF_L2VPN": 7, + "DEVICEDRIVER_GNMI_OPENCONFIG": 8, + "DEVICEDRIVER_OPTICAL_TFS": 9, + "DEVICEDRIVER_IETF_ACTN": 10, + "DEVICEDRIVER_OC": 11, +} + +HEADERS = {'Content-Type': 'application/json'} + +class NCEClient: + def __init__( + self, + address: str, + port: int, + scheme: str = "http", + username: Optional[str] = None, + password: Optional[str] = None, + ) -> None: + self._nce_fan_url = NCE_FAN_URL.format(scheme, address, port) + self._auth = None + + def create_app_flow(self, app_flow_data: dict) -> None: + try: + app_data = app_flow_data["huawei-nce-app-flow:app-flows"]["applications"] + app_url = self._nce_fan_url + "/app-flows/apps" + LOGGER.info(f'Creating app: {app_data} URL: {app_url}') + # requests.post(app_url, json=app_data, headers=HEADERS) + app_flow_data = { + "app-flow": app_flow_data["huawei-nce-app-flow:app-flows"]["app-flow"] + } + app_flow_url = self._nce_fan_url + "/app-flows" + LOGGER.info(f'Creating app flow: {app_flow_data} URL: {app_flow_url}') + # requests.post(app_flow_url, json=app_flow_data, headers=HEADERS) + except requests.exceptions.ConnectionError: + raise Exception("faild to send post requests to NCE FAN") + + def delete_app_flow(self, app_flow_name: str) -> None: + try: + app_url = self._nce_fan_url + f"/app-flows/apps/application={app_flow_name}" + LOGGER.info(f'Deleting app: {app_flow_name} URL: {app_url}') + # requests.delete(app_url) + app_flow_url = self._nce_fan_url + f"/app-flows/app-flow={app_flow_name}" + LOGGER.info(f'Deleting app flow: {app_flow_name} URL: {app_flow_url}') + # requests.delete(app_flow_url) + except requests.exceptions.ConnectionError: + raise Exception("faild to send delete request to NCE FAN")