# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, threading from typing import Any, List, Tuple from ncclient.manager import Manager, connect_ssh from common.tools.client.RetryDecorator import delay_exponential from common.type_checkers.Checkers import chk_length, chk_string, chk_type from device.service.driver_api.Exceptions import UnsupportedResourceKeyException from .templates import EMPTY_CONFIG, compose_config from .RetryDecorator import retry MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') LOGGER = logging.getLogger(__name__) class NetconfSessionHandler: def __init__(self, address : str, port : int, **settings) -> None: self.__lock = threading.RLock() self.__connected = threading.Event() self.__address = address self.__port = int(port) self.__username = settings.get('username') self.__password = settings.get('password') self.__vendor = settings.get('vendor') self.__key_filename = settings.get('key_filename') self.__hostkey_verify = settings.get('hostkey_verify', True) self.__look_for_keys = settings.get('look_for_keys', True) self.__allow_agent = settings.get('allow_agent', True) self.__force_running = settings.get('force_running', False) self.__commit_per_delete = settings.get('delete_rule', False) self.__device_params = settings.get('device_params', {}) self.__manager_params = settings.get('manager_params', {}) self.__nc_params = settings.get('nc_params', {}) self.__manager : Manager = None self.__candidate_supported = False def connect(self): with self.__lock: self.__manager = connect_ssh( host=self.__address, port=self.__port, username=self.__username, password=self.__password, device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params, key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent, look_for_keys=self.__look_for_keys) self.__candidate_supported = ':candidate' in self.__manager.server_capabilities self.__connected.set() def disconnect(self): if not self.__connected.is_set(): return with self.__lock: self.__manager.close_session() @property def use_candidate(self): return self.__candidate_supported and not self.__force_running @property def commit_per_rule(self): return self.__commit_per_delete @property def vendor(self): return self.__vendor @RETRY_DECORATOR def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin with self.__lock: return self.__manager.get(filter=filter, with_defaults=with_defaults) @RETRY_DECORATOR def edit_config( self, config, target='running', default_operation=None, test_option=None, error_option=None, format='xml' # pylint: disable=redefined-builtin ): if config == EMPTY_CONFIG: return with self.__lock: self.__manager.edit_config( config, target=target, default_operation=default_operation, test_option=test_option, error_option=error_option, format=format) def locked(self, target): return self.__manager.locked(target=target) def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) def edit_config( netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, target='running', default_operation='merge', test_option=None, error_option=None, format='xml' # pylint: disable=redefined-builtin ): str_method = 'DeleteConfig' if delete else 'SetConfig' LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources))) results = [None for _ in resources] for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource))) chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key,resource_value = resource chk_string(str_resource_name + '.key', resource_key, allow_empty=False) str_config_message = compose_config( resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor) if str_config_message is None: raise UnsupportedResourceKeyException(resource_key) LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format( str_method, len(str_config_message), str(str_config_message))) netconf_handler.edit_config( config=str_config_message, target=target, default_operation=default_operation, test_option=test_option, error_option=error_option, format=format) if commit_per_rule: netconf_handler.commit() results[i] = True except Exception as e: # pylint: disable=broad-except str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting') msg = '[{:s}] Exception {:s} {:s}: {:s}' LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource))) results[i] = e # if validation fails, store the exception return results