From 80f5f6cbb5648d9efe8521946f05ecfc39529d62 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Wed, 14 Jun 2023 16:04:41 +0000 Subject: [PATCH] Device component - OpenConfig Driver: - Extracted gNMI code (will be added to new driver) --- .../drivers/openconfig/NetConfTelemetry.py | 14 -- .../openconfig/NetconfSessionHandler.py | 129 --------------- .../drivers/openconfig/OpenConfigDriver.py | 147 +----------------- .../drivers/openconfig/SamplesCache.py | 100 ------------ 4 files changed, 5 insertions(+), 385 deletions(-) delete mode 100644 src/device/service/drivers/openconfig/NetConfTelemetry.py delete mode 100644 src/device/service/drivers/openconfig/NetconfSessionHandler.py delete mode 100644 src/device/service/drivers/openconfig/SamplesCache.py diff --git a/src/device/service/drivers/openconfig/NetConfTelemetry.py b/src/device/service/drivers/openconfig/NetConfTelemetry.py deleted file mode 100644 index 1549d9811..000000000 --- a/src/device/service/drivers/openconfig/NetConfTelemetry.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - diff --git a/src/device/service/drivers/openconfig/NetconfSessionHandler.py b/src/device/service/drivers/openconfig/NetconfSessionHandler.py deleted file mode 100644 index 746f11d12..000000000 --- a/src/device/service/drivers/openconfig/NetconfSessionHandler.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, threading -from typing import Any, List, Tuple -from ncclient.manager import Manager, connect_ssh -from common.tools.client.RetryDecorator import delay_exponential -from common.type_checkers.Checkers import chk_length, chk_string, chk_type -from device.service.driver_api.Exceptions import UnsupportedResourceKeyException -from .templates import EMPTY_CONFIG, compose_config -from .RetryDecorator import retry - -MAX_RETRIES = 15 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) -RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') - -LOGGER = logging.getLogger(__name__) - -class NetconfSessionHandler: - def __init__(self, address : str, port : int, **settings) -> None: - self.__lock = threading.RLock() - self.__connected = threading.Event() - self.__address = address - self.__port = int(port) - self.__username = settings.get('username') - self.__password = settings.get('password') - self.__vendor = settings.get('vendor') - self.__key_filename = settings.get('key_filename') - self.__hostkey_verify = settings.get('hostkey_verify', True) - self.__look_for_keys = settings.get('look_for_keys', True) - self.__allow_agent = settings.get('allow_agent', True) - self.__force_running = settings.get('force_running', False) - self.__commit_per_delete = settings.get('delete_rule', False) - self.__device_params = settings.get('device_params', {}) - self.__manager_params = settings.get('manager_params', {}) - self.__nc_params = settings.get('nc_params', {}) - self.__manager : Manager = None - self.__candidate_supported = False - - def connect(self): - with self.__lock: - self.__manager = connect_ssh( - host=self.__address, port=self.__port, username=self.__username, password=self.__password, - device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params, - key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent, - look_for_keys=self.__look_for_keys) - self.__candidate_supported = ':candidate' in self.__manager.server_capabilities - self.__connected.set() - - def disconnect(self): - if not self.__connected.is_set(): return - with self.__lock: - self.__manager.close_session() - - @property - def use_candidate(self): return self.__candidate_supported and not self.__force_running - - @property - def commit_per_rule(self): return self.__commit_per_delete - - @property - def vendor(self): return self.__vendor - - @RETRY_DECORATOR - def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin - with self.__lock: - return self.__manager.get(filter=filter, with_defaults=with_defaults) - - @RETRY_DECORATOR - def edit_config( - self, config, target='running', default_operation=None, test_option=None, - error_option=None, format='xml' # pylint: disable=redefined-builtin - ): - if config == EMPTY_CONFIG: return - with self.__lock: - self.__manager.edit_config( - config, target=target, default_operation=default_operation, test_option=test_option, - error_option=error_option, format=format) - - def locked(self, target): - return self.__manager.locked(target=target) - - def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): - return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) - -def edit_config( - netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, - target='running', default_operation='merge', test_option=None, error_option=None, - format='xml' # pylint: disable=redefined-builtin -): - str_method = 'DeleteConfig' if delete else 'SetConfig' - LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources))) - results = [None for _ in resources] - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) - try: - LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource))) - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, min_length=2, max_length=2) - resource_key,resource_value = resource - chk_string(str_resource_name + '.key', resource_key, allow_empty=False) - str_config_message = compose_config( - resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor) - if str_config_message is None: raise UnsupportedResourceKeyException(resource_key) - LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format( - str_method, len(str_config_message), str(str_config_message))) - netconf_handler.edit_config( - config=str_config_message, target=target, default_operation=default_operation, - test_option=test_option, error_option=error_option, format=format) - if commit_per_rule: - netconf_handler.commit() - results[i] = True - except Exception as e: # pylint: disable=broad-except - str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting') - msg = '[{:s}] Exception {:s} {:s}: {:s}' - LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource))) - results[i] = e # if validation fails, store the exception - return results diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index de41e0cee..c9a9e7e6f 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anytree, copy, logging, pytz, queue, re, threading, json, os, sys +import anytree, copy, logging, pytz, queue, re, threading #import lxml.etree as ET from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, Tuple, Union @@ -31,14 +31,6 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse from .RetryDecorator import retry -import grpc -from google.protobuf.json_format import MessageToJson - -gnmi_path__ = os.path.dirname(os.path.abspath(__file__)) -sys.path.append(gnmi_path__) -import gnmi_pb2_grpc -import gnmi_pb2 - DEBUG_MODE = False logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) @@ -139,106 +131,6 @@ class NetconfSessionHandler: def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) -class gNMISessionHandler: - def __init__(self, address : str, **settings) -> None: - self.__lock = threading.RLock() - self.__connected = threading.Event() - self.__address = address - self.__port = settings.get('gnmi_port') - self.__username = settings.get('username') - self.__password = settings.get('password') - self.__vendor = settings.get('vendor') - self.__key_filename = settings.get('key_filename') - self.__hostkey_verify = settings.get('hostkey_verify', True) - self.__look_for_keys = settings.get('look_for_keys', True) - self.__allow_agent = settings.get('allow_agent', True) - self.__force_running = settings.get('force_running', False) - self.__commit_per_delete = settings.get('delete_rule', False) - self.__device_params = settings.get('device_params', {}) - self.__manager_params = settings.get('manager_params', {}) - self.__nc_params = settings.get('nc_params', {}) - self.__stub = None - self.__candidate_supported = False - self.__channel = None - self.__supportedEncodings = None - self.__options = Options() - - def connect(self): - with self.__lock: - self.__channel = grpc.insecure_channel(str(self.__address)+':'+self.__port) - self.__stub = gnmi_pb2_grpc.gNMIStub(self.__channel) - metadata = [('username',self.__username ), ('password', self.__password)] - req = gnmi_pb2.CapabilityRequest() - response = self.__stub.Capabilities(req, metadata=metadata) - data = json.loads(MessageToJson(response)) - self.__supportedEncodings = data['supportedEncodings'] - # TODO: self.__candidate_supported = - self.__connected.set() - - def disconnect(self): - if not self.__connected.is_set(): return - with self.__lock: - self.__channel.close() - - def subscribeStreaming(self, subscription : Tuple[str, float, float], out_samples : queue.Queue) -> None: - resource_key, sampling_duration, sampling_interval = subscription - options = copy.deepcopy(self.__options) - options.xpaths = [parse_xpath(resource_key)] - options.timeout = int(sampling_duration) - options.interval = int(sampling_interval) - req_iterator = gen_request(options) - metadata = [('username',self.__username), ('password', self.__password)] - responses = self.__stub.Subscribe(req_iterator, self.__options.timeout, metadata=metadata) - previous_sample = None - delta = 0.0 - previous_timestamp = datetime.timestamp(datetime.utcnow()) - for response in responses: - data = json.loads(MessageToJson(response)) - if data.get("update") is not None and data.get("update").get("update") != None: - now = datetime.timestamp(datetime.utcnow()) - for element in data['update']['update']: - counter_name = split_resource_key(dict_to_xpath(element['path'])) - if counter_name == split_resource_key(resource_key): - value = int(element['val']['uintVal']) - delay = now - previous_timestamp - if previous_sample is not None: delta = (value - previous_sample)/delay - previous_sample = int(value) - previous_timestamp = now - sample = (now, resource_key, delta) - out_samples.put_nowait(sample) - - @property - def use_candidate(self): return self.__candidate_supported and not self.__force_running - - @property - def commit_per_rule(self): return self.__commit_per_delete - - @property - def vendor(self): return self.__vendor - - @RETRY_DECORATOR - def get(self): # pylint: disable=redefined-builtin - return False - - @RETRY_DECORATOR - def edit_config( - self, config, target='running', default_operation=None, test_option=None, - error_option=None, format='xml' # pylint: disable=redefined-builtin - ): - if config == EMPTY_CONFIG: return - with self.__lock: - self.__manager.edit_config( - config, target=target, default_operation=default_operation, test_option=test_option, - error_option=error_option, format=format) - - @RETRY_DECORATOR - def locked(self, target): - return self.__manager.locked(target=target) - - @RETRY_DECORATOR - def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): - return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) - def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): if previous_sample is None: return None if previous_timestamp is None: return None @@ -259,14 +151,13 @@ def compute_delta_sample(previous_sample, previous_timestamp, current_sample, cu return delta_sample class SamplesCache: - def __init__(self, netconf_handler : NetconfSessionHandler, gNMI_handler : gNMISessionHandler, logger : logging.Logger) -> None: + def __init__(self, netconf_handler : NetconfSessionHandler, logger : logging.Logger) -> None: self.__netconf_handler = netconf_handler self.__logger = logger self.__lock = threading.Lock() self.__timestamp = None self.__absolute_samples = {} self.__delta_samples = {} - self.__gNMI_handler = gNMI_handler def _refresh_samples(self) -> None: with self.__lock: @@ -311,24 +202,6 @@ def do_sampling( except: # pylint: disable=bare-except logger.exception('Error retrieving samples') -class Options: - def __init__(self, xpaths=None, prefix=None, mode=0, submode=0, suppress=False, interval=0, - encoding='JSON', heartbeat=0, qos=None, aggregate=False, server=None, username='admin', password='admin', timeout=None): - self.xpaths = xpaths - self.prefix = prefix - self.mode = mode - self.submode = submode - self.suppress = suppress - self.interval = interval - self.encoding = encoding - self.heartbeat = heartbeat - self.qos = qos - self.aggregate = aggregate - self.server = server - self.username = username - self.password = password - self.timeout = timeout - def edit_config( # edit the configuration of openconfig devices netconf_handler : NetconfSessionHandler, logger : logging.Logger, resources : List[Tuple[str, Any]], delete=False, commit_per_rule=False, target='running', default_operation='merge', test_option=None, error_option=None, @@ -386,7 +259,6 @@ class OpenConfigDriver(_Driver): self.__subscriptions = TreeNode('.') self.__started = threading.Event() self.__terminate = threading.Event() - self.__gnmi_monitoring = settings.get('monitoring_protocol') == 'gnmi' self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( jobstores = {'default': MemoryJobStore()}, @@ -395,16 +267,12 @@ class OpenConfigDriver(_Driver): timezone=pytz.utc) self.__out_samples = queue.Queue() self.__netconf_handler = NetconfSessionHandler(self.address, self.port, **(self.settings)) - self.__gNMI_handler : gNMISessionHandler = gNMISessionHandler(address, **settings) - self.__samples_cache = SamplesCache(self.__netconf_handler, self.__gNMI_handler, self.__logger) + self.__samples_cache = SamplesCache(self.__netconf_handler, self.__logger) def Connect(self) -> bool: with self.__lock: if self.__started.is_set(): return True self.__netconf_handler.connect() - if self.__gnmi_monitoring: - self.__gNMI_handler.connect() - LOGGER.debug('Using gNMI as monitoring protocol') # Connect triggers activation of sampling events that will be scheduled based on subscriptions self.__scheduler.start() self.__started.set() @@ -418,7 +286,7 @@ class OpenConfigDriver(_Driver): if not self.__started.is_set(): return True # Disconnect triggers deactivation of sampling events self.__scheduler.shutdown() - if self.__gnmi_monitoring: self.__netconf_handler.disconnect() + self.__netconf_handler.disconnect() return True @metered_subclass_method(METRICS_POOL) @@ -506,12 +374,7 @@ class OpenConfigDriver(_Driver): job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) - if self.__gnmi_monitoring: - LOGGER.debug('Processing gNMI subscription: '+ str(subscription)) - job = threading.Thread(target=self.__gNMI_handler.subscribeStreaming, args=(subscription, self.__out_samples)) - job.start() - else: - job = self.__scheduler.add_job( + job = self.__scheduler.add_job( do_sampling, args=(self.__samples_cache, self.__logger, resource_key, self.__out_samples), kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, end_date=end_date, timezone=pytz.utc) diff --git a/src/device/service/drivers/openconfig/SamplesCache.py b/src/device/service/drivers/openconfig/SamplesCache.py deleted file mode 100644 index 24dc33663..000000000 --- a/src/device/service/drivers/openconfig/SamplesCache.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Collection of samples through NetConf is very slow and each request collects all the data. -# Populate a cache periodically (when first interface is interrogated). -# Evict data after some seconds, when data is considered as outdated - -import copy, queue, logging, re, threading -from datetime import datetime -from typing import Dict, Tuple -from .templates import get_filter, parse -from .NetconfSessionHandler import NetconfSessionHandler - -SAMPLE_EVICTION_SECONDS = 30.0 # seconds -SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' - -RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') -RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") - -LOGGER = logging.getLogger(__name__) - -def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): - if previous_sample is None: return None - if previous_timestamp is None: return None - if current_sample is None: return None - if current_timestamp is None: return None - delay = current_timestamp - previous_timestamp - field_keys = set(previous_sample.keys()).union(current_sample.keys()) - field_keys.discard('name') - delta_sample = {'name': previous_sample['name']} - for field_key in field_keys: - previous_sample_value = previous_sample[field_key] - if not isinstance(previous_sample_value, (int, float)): continue - current_sample_value = current_sample[field_key] - if not isinstance(current_sample_value, (int, float)): continue - delta_value = current_sample_value - previous_sample_value - if delta_value < 0: continue - delta_sample[field_key] = delta_value / delay - return delta_sample - -class SamplesCache: - def __init__(self, netconf_handler : NetconfSessionHandler) -> None: - self.__netconf_handler = netconf_handler - self.__lock = threading.Lock() - self.__timestamp = None - self.__absolute_samples = {} - self.__delta_samples = {} - - def _refresh_samples(self) -> None: - with self.__lock: - try: - now = datetime.timestamp(datetime.utcnow()) - if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return - str_filter = get_filter(SAMPLE_RESOURCE_KEY) - xml_data = self.__netconf_handler.get(filter=str_filter).data_ele - interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) - for interface,samples in interface_samples: - match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) - if match is None: continue - interface = match.group(1) - delta_sample = compute_delta_sample( - self.__absolute_samples.get(interface), self.__timestamp, samples, now) - if delta_sample is not None: self.__delta_samples[interface] = delta_sample - self.__absolute_samples[interface] = samples - self.__timestamp = now - except: # pylint: disable=bare-except - LOGGER.exception('Error collecting samples') - - def get(self, resource_key : str) -> Tuple[float, Dict]: - self._refresh_samples() - match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) - with self.__lock: - if match is None: return self.__timestamp, {} - interface = match.group(1) - return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) - -def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None: - try: - timestamp, samples = samples_cache.get(resource_key) - counter_name = resource_key.split('/')[-1].split(':')[-1] - value = samples.get(counter_name) - if value is None: - LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key)) - return - # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} - sample = (timestamp, resource_key, value) - out_samples.put_nowait(sample) - except: # pylint: disable=bare-except - LOGGER.exception('Error retrieving samples') -- GitLab