diff --git a/src/device/service/drivers/openconfig/NetconfSessionHandler.py b/src/device/service/drivers/openconfig/NetconfSessionHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..746f11d120088429b5bf007839400e09842c0c5c --- /dev/null +++ b/src/device/service/drivers/openconfig/NetconfSessionHandler.py @@ -0,0 +1,129 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, threading +from typing import Any, List, Tuple +from ncclient.manager import Manager, connect_ssh +from common.tools.client.RetryDecorator import delay_exponential +from common.type_checkers.Checkers import chk_length, chk_string, chk_type +from device.service.driver_api.Exceptions import UnsupportedResourceKeyException +from .templates import EMPTY_CONFIG, compose_config +from .RetryDecorator import retry + +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +LOGGER = logging.getLogger(__name__) + +class NetconfSessionHandler: + def __init__(self, address : str, port : int, **settings) -> None: + self.__lock = threading.RLock() + self.__connected = threading.Event() + self.__address = address + self.__port = int(port) + self.__username = settings.get('username') + self.__password = settings.get('password') + self.__vendor = settings.get('vendor') + self.__key_filename = settings.get('key_filename') + self.__hostkey_verify = settings.get('hostkey_verify', True) + self.__look_for_keys = settings.get('look_for_keys', True) + self.__allow_agent = settings.get('allow_agent', True) + self.__force_running = settings.get('force_running', False) + self.__commit_per_delete = settings.get('delete_rule', False) + self.__device_params = settings.get('device_params', {}) + self.__manager_params = settings.get('manager_params', {}) + self.__nc_params = settings.get('nc_params', {}) + self.__manager : Manager = None + self.__candidate_supported = False + + def connect(self): + with self.__lock: + self.__manager = connect_ssh( + host=self.__address, port=self.__port, username=self.__username, password=self.__password, + device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params, + key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent, + look_for_keys=self.__look_for_keys) + self.__candidate_supported = ':candidate' in self.__manager.server_capabilities + self.__connected.set() + + def disconnect(self): + if not self.__connected.is_set(): return + with self.__lock: + self.__manager.close_session() + + @property + def use_candidate(self): return self.__candidate_supported and not self.__force_running + + @property + def commit_per_rule(self): return self.__commit_per_delete + + @property + def vendor(self): return self.__vendor + + @RETRY_DECORATOR + def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin + with self.__lock: + return self.__manager.get(filter=filter, with_defaults=with_defaults) + + @RETRY_DECORATOR + def edit_config( + self, config, target='running', default_operation=None, test_option=None, + error_option=None, format='xml' # pylint: disable=redefined-builtin + ): + if config == EMPTY_CONFIG: return + with self.__lock: + self.__manager.edit_config( + config, target=target, default_operation=default_operation, test_option=test_option, + error_option=error_option, format=format) + + def locked(self, target): + return self.__manager.locked(target=target) + + def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): + return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) + +def edit_config( + netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, + target='running', default_operation='merge', test_option=None, error_option=None, + format='xml' # pylint: disable=redefined-builtin +): + str_method = 'DeleteConfig' if delete else 'SetConfig' + LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources))) + results = [None for _ in resources] + for i,resource in enumerate(resources): + str_resource_name = 'resources[#{:d}]'.format(i) + try: + LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource))) + chk_type(str_resource_name, resource, (list, tuple)) + chk_length(str_resource_name, resource, min_length=2, max_length=2) + resource_key,resource_value = resource + chk_string(str_resource_name + '.key', resource_key, allow_empty=False) + str_config_message = compose_config( + resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor) + if str_config_message is None: raise UnsupportedResourceKeyException(resource_key) + LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format( + str_method, len(str_config_message), str(str_config_message))) + netconf_handler.edit_config( + config=str_config_message, target=target, default_operation=default_operation, + test_option=test_option, error_option=error_option, format=format) + if commit_per_rule: + netconf_handler.commit() + results[i] = True + except Exception as e: # pylint: disable=broad-except + str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting') + msg = '[{:s}] Exception {:s} {:s}: {:s}' + LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource))) + results[i] = e # if validation fails, store the exception + return results diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index 923cd887603735c5c011187522e178631e909aab..96affc318d869bd3776c7a9ba9fdb7acccefac87 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -12,25 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anytree, copy, logging, pytz, queue, re, threading +import anytree, logging, pytz, queue, threading #import lxml.etree as ET from datetime import datetime, timedelta -from typing import Any, Dict, Iterator, List, Optional, Tuple, Union +from typing import Any, Iterator, List, Optional, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler -from ncclient.manager import Manager, connect_ssh from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF -from common.tools.client.RetryDecorator import delay_exponential from common.type_checkers.Checkers import chk_length, chk_string, chk_type, chk_float -from device.service.driver_api.Exceptions import UnsupportedResourceKeyException from device.service.driver_api._Driver import _Driver -from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value #dump_subtree +from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value +from .templates import ALL_RESOURCE_KEYS, get_filter, parse +from .NetconfSessionHandler import NetconfSessionHandler, edit_config +from .SamplesCache import SamplesCache, do_sampling #dump_subtree +#from .TelemetryProtocolEnum import TelemetryProtocolEnum, parse_telemetry_protocol #from .Tools import xml_pretty_print, xml_to_dict, xml_to_file -from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse -from .RetryDecorator import retry -from .TelemetryProtocolEnum import DEFAULT_TELEMETRY_PROTOCOL, TelemetryProtocolEnum, parse_telemetry_protocol DEBUG_MODE = False logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) @@ -41,136 +39,6 @@ logging.getLogger('monitoring-client').setLevel(logging.INFO if DEBUG_MODE else LOGGER = logging.getLogger(__name__) -RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') -RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") - -# Collection of samples through NetConf is very slow and each request collects all the data. -# Populate a cache periodically (when first interface is interrogated). -# Evict data after some seconds, when data is considered as outdated - -SAMPLE_EVICTION_SECONDS = 30.0 # seconds -SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' - -MAX_RETRIES = 15 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) -RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') - -class NetconfSessionHandler: - def __init__(self, address : str, port : int, **settings) -> None: - self.__lock = threading.RLock() - self.__connected = threading.Event() - self.__address = address - self.__port = int(port) - self.__username = settings.get('username') - self.__password = settings.get('password') - self.__vendor = settings.get('vendor') - self.__key_filename = settings.get('key_filename') - self.__hostkey_verify = settings.get('hostkey_verify', True) - self.__look_for_keys = settings.get('look_for_keys', True) - self.__allow_agent = settings.get('allow_agent', True) - self.__force_running = settings.get('force_running', False) - self.__commit_per_delete = settings.get('delete_rule', False) - self.__device_params = settings.get('device_params', {}) - self.__manager_params = settings.get('manager_params', {}) - self.__nc_params = settings.get('nc_params', {}) - self.__manager : Manager = None - self.__candidate_supported = False - - def connect(self): - with self.__lock: - self.__manager = connect_ssh( - host=self.__address, port=self.__port, username=self.__username, password=self.__password, - device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params, - key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent, - look_for_keys=self.__look_for_keys) - self.__candidate_supported = ':candidate' in self.__manager.server_capabilities - self.__connected.set() - - def disconnect(self): - if not self.__connected.is_set(): return - with self.__lock: - self.__manager.close_session() - - @property - def use_candidate(self): return self.__candidate_supported and not self.__force_running - - @property - def commit_per_rule(self): return self.__commit_per_delete - - @property - def vendor(self): return self.__vendor - - @RETRY_DECORATOR - def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin - with self.__lock: - return self.__manager.get(filter=filter, with_defaults=with_defaults) - - @RETRY_DECORATOR - def edit_config( - self, config, target='running', default_operation=None, test_option=None, - error_option=None, format='xml' # pylint: disable=redefined-builtin - ): - if config == EMPTY_CONFIG: return - with self.__lock: - self.__manager.edit_config( - config, target=target, default_operation=default_operation, test_option=test_option, - error_option=error_option, format=format) - - def locked(self, target): - return self.__manager.locked(target=target) - - def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): - return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) - - -def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None: - try: - timestamp, samples = samples_cache.get(resource_key) - counter_name = resource_key.split('/')[-1].split(':')[-1] - value = samples.get(counter_name) - if value is None: - LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key)) - return - # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} - sample = (timestamp, resource_key, value) - out_samples.put_nowait(sample) - except: # pylint: disable=bare-except - LOGGER.exception('Error retrieving samples') - -def edit_config( - netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, - target='running', default_operation='merge', test_option=None, error_option=None, - format='xml' # pylint: disable=redefined-builtin -): - str_method = 'DeleteConfig' if delete else 'SetConfig' - LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources))) - results = [None for _ in resources] - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) - try: - LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource))) - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, min_length=2, max_length=2) - resource_key,resource_value = resource - chk_string(str_resource_name + '.key', resource_key, allow_empty=False) - str_config_message = compose_config( - resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor) - if str_config_message is None: raise UnsupportedResourceKeyException(resource_key) - LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format( - str_method, len(str_config_message), str(str_config_message))) - netconf_handler.edit_config( - config=str_config_message, target=target, default_operation=default_operation, - test_option=test_option, error_option=error_option, format=format) - if commit_per_rule: - netconf_handler.commit() - results[i] = True - except Exception as e: # pylint: disable=broad-except - str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting') - msg = '[{:s}] Exception {:s} {:s}: {:s}' - LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource))) - results[i] = e # if validation fails, store the exception - return results - HISTOGRAM_BUCKETS = ( # .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF 0.0001, 0.00025, 0.00050, 0.00075, @@ -198,11 +66,11 @@ class OpenConfigDriver(_Driver): self.__started = threading.Event() self.__terminate = threading.Event() - self.__telemetry_protocol = parse_telemetry_protocol(settings.get('telemetry_protocol')) - if self.__telemetry_protocol == TelemetryProtocolEnum.GNMI: - self.__telemetry = GnmiTelemetry() - else: - self.__telemetry = NetConfTelemetry() + #self.__telemetry_protocol = parse_telemetry_protocol(settings.get('telemetry_protocol')) + #if self.__telemetry_protocol == TelemetryProtocolEnum.GNMI: + # self.__telemetry = GnmiTelemetry() + #else: + # self.__telemetry = NetConfTelemetry() self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( @@ -268,7 +136,8 @@ class OpenConfigDriver(_Driver): if self.__netconf_handler.use_candidate: with self.__netconf_handler.locked(target='candidate'): if self.__netconf_handler.commit_per_rule: - results = edit_config(self.__netconf_handler, resources, target='candidate', commit_per_rule= True) + results = edit_config( + self.__netconf_handler, resources, target='candidate', commit_per_rule=True) else: results = edit_config(self.__netconf_handler, resources, target='candidate') try: @@ -288,13 +157,15 @@ class OpenConfigDriver(_Driver): if self.__netconf_handler.use_candidate: with self.__netconf_handler.locked(target='candidate'): if self.__netconf_handler.commit_per_rule: - results = edit_config(self.__netconf_handler, resources, target='candidate', delete=True, commit_per_rule= True) + results = edit_config( + self.__netconf_handler, resources, target='candidate', delete=True, commit_per_rule=True) else: results = edit_config(self.__netconf_handler, resources, target='candidate', delete=True) try: self.__netconf_handler.commit() except Exception as e: # pylint: disable=broad-except - LOGGER.exception('[DeleteConfig] Exception commiting resources: {:s}'.format(str(resources))) + MSG = '[DeleteConfig] Exception commiting resources: {:s}' + LOGGER.exception(MSG.format(str(resources))) results = [e for _ in resources] # if commit fails, set exception in each resource else: results = edit_config(self.__netconf_handler, resources, delete=True) diff --git a/src/device/service/drivers/openconfig/SamplesCache.py b/src/device/service/drivers/openconfig/SamplesCache.py index c2fa6bcecdff39ad74cff26676e718cec66a22f7..24dc33663f1676e0ccf2096ec71b7f6448322740 100644 --- a/src/device/service/drivers/openconfig/SamplesCache.py +++ b/src/device/service/drivers/openconfig/SamplesCache.py @@ -12,6 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Collection of samples through NetConf is very slow and each request collects all the data. +# Populate a cache periodically (when first interface is interrogated). +# Evict data after some seconds, when data is considered as outdated + +import copy, queue, logging, re, threading +from datetime import datetime +from typing import Dict, Tuple +from .templates import get_filter, parse +from .NetconfSessionHandler import NetconfSessionHandler + +SAMPLE_EVICTION_SECONDS = 30.0 # seconds +SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' + +RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') +RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") + +LOGGER = logging.getLogger(__name__) + def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): if previous_sample is None: return None if previous_timestamp is None: return None @@ -66,3 +84,17 @@ class SamplesCache: if match is None: return self.__timestamp, {} interface = match.group(1) return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) + +def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None: + try: + timestamp, samples = samples_cache.get(resource_key) + counter_name = resource_key.split('/')[-1].split(':')[-1] + value = samples.get(counter_name) + if value is None: + LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key)) + return + # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} + sample = (timestamp, resource_key, value) + out_samples.put_nowait(sample) + except: # pylint: disable=bare-except + LOGGER.exception('Error retrieving samples')