Skip to content
NetconfSessionHandler.py 6.32 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, threading
from typing import Any, List, Tuple
from ncclient.manager import Manager, connect_ssh
from common.tools.client.RetryDecorator import delay_exponential
from common.type_checkers.Checkers import chk_length, chk_string, chk_type
from device.service.driver_api.Exceptions import UnsupportedResourceKeyException
from .templates import EMPTY_CONFIG, compose_config
from .RetryDecorator import retry

MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')

LOGGER = logging.getLogger(__name__)

class NetconfSessionHandler:
    def __init__(self, address : str, port : int, **settings) -> None:
        self.__lock = threading.RLock()
        self.__connected = threading.Event()
        self.__address = address
        self.__port = int(port)
        self.__username       = settings.get('username')
        self.__password       = settings.get('password')
        self.__vendor         = settings.get('vendor')
        self.__key_filename   = settings.get('key_filename')
        self.__hostkey_verify = settings.get('hostkey_verify', True)
        self.__look_for_keys  = settings.get('look_for_keys', True)
        self.__allow_agent    = settings.get('allow_agent', True)
        self.__force_running  = settings.get('force_running', False)
        self.__commit_per_delete  = settings.get('delete_rule', False)
        self.__device_params  = settings.get('device_params', {})
        self.__manager_params = settings.get('manager_params', {})
        self.__nc_params      = settings.get('nc_params', {})
        self.__manager : Manager   = None
        self.__candidate_supported = False

    def connect(self):
        with self.__lock:
            self.__manager = connect_ssh(
                host=self.__address, port=self.__port, username=self.__username, password=self.__password,
                device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params,
                key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent,
                look_for_keys=self.__look_for_keys)
            self.__candidate_supported = ':candidate' in self.__manager.server_capabilities
            self.__connected.set()

    def disconnect(self):
        if not self.__connected.is_set(): return
        with self.__lock:
            self.__manager.close_session()

    @property
    def use_candidate(self): return self.__candidate_supported and not self.__force_running

    @property
    def commit_per_rule(self): return self.__commit_per_delete 

    @property
    def vendor(self): return self.__vendor

    @RETRY_DECORATOR
    def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin
        with self.__lock:
            return self.__manager.get(filter=filter, with_defaults=with_defaults)

    @RETRY_DECORATOR
    def edit_config(
        self, config, target='running', default_operation=None, test_option=None,
        error_option=None, format='xml'                                             # pylint: disable=redefined-builtin
    ):
        if config == EMPTY_CONFIG: return
        with self.__lock:
            self.__manager.edit_config(
                config, target=target, default_operation=default_operation, test_option=test_option,
                error_option=error_option, format=format)

    def locked(self, target):
        return self.__manager.locked(target=target)

    def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None):
        return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id)

def edit_config(
    netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False,
    target='running', default_operation='merge', test_option=None, error_option=None,
    format='xml' # pylint: disable=redefined-builtin
):
    str_method = 'DeleteConfig' if delete else 'SetConfig'
    LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources)))
    results = [None for _ in resources]
    for i,resource in enumerate(resources):
        str_resource_name = 'resources[#{:d}]'.format(i)
        try:
            LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource)))
            chk_type(str_resource_name, resource, (list, tuple))
            chk_length(str_resource_name, resource, min_length=2, max_length=2)
            resource_key,resource_value = resource
            chk_string(str_resource_name + '.key', resource_key, allow_empty=False)
            str_config_message = compose_config(
                resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor)
            if str_config_message is None: raise UnsupportedResourceKeyException(resource_key)
            LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format(
                str_method, len(str_config_message), str(str_config_message)))
            netconf_handler.edit_config(
                config=str_config_message, target=target, default_operation=default_operation,
                test_option=test_option, error_option=error_option, format=format)
            if commit_per_rule:
                netconf_handler.commit()
            results[i] = True
        except Exception as e: # pylint: disable=broad-except
            str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting')
            msg = '[{:s}] Exception {:s} {:s}: {:s}'
            LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource)))
            results[i] = e # if validation fails, store the exception
    return results