Skip to content
Snippets Groups Projects
Commit 0e8dc476 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component - OpenConfigDriver:

- Reorganized classes and imports
parent 363e8264
No related branches found
No related tags found
4 merge requests!142Release TeraFlowSDN 2.1,!132NetSoft Hackfest extensions, gNMI Driver, gNMI L3NM Service Handler, multiple fixes,!113Draft: NetSoft Hackfest extensions,!75Device component - OpenConfig gNMI Driver
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading
from typing import Any, List, Tuple
from ncclient.manager import Manager, connect_ssh
from common.tools.client.RetryDecorator import delay_exponential
from common.type_checkers.Checkers import chk_length, chk_string, chk_type
from device.service.driver_api.Exceptions import UnsupportedResourceKeyException
from .templates import EMPTY_CONFIG, compose_config
from .RetryDecorator import retry
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
LOGGER = logging.getLogger(__name__)
class NetconfSessionHandler:
def __init__(self, address : str, port : int, **settings) -> None:
self.__lock = threading.RLock()
self.__connected = threading.Event()
self.__address = address
self.__port = int(port)
self.__username = settings.get('username')
self.__password = settings.get('password')
self.__vendor = settings.get('vendor')
self.__key_filename = settings.get('key_filename')
self.__hostkey_verify = settings.get('hostkey_verify', True)
self.__look_for_keys = settings.get('look_for_keys', True)
self.__allow_agent = settings.get('allow_agent', True)
self.__force_running = settings.get('force_running', False)
self.__commit_per_delete = settings.get('delete_rule', False)
self.__device_params = settings.get('device_params', {})
self.__manager_params = settings.get('manager_params', {})
self.__nc_params = settings.get('nc_params', {})
self.__manager : Manager = None
self.__candidate_supported = False
def connect(self):
with self.__lock:
self.__manager = connect_ssh(
host=self.__address, port=self.__port, username=self.__username, password=self.__password,
device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params,
key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent,
look_for_keys=self.__look_for_keys)
self.__candidate_supported = ':candidate' in self.__manager.server_capabilities
self.__connected.set()
def disconnect(self):
if not self.__connected.is_set(): return
with self.__lock:
self.__manager.close_session()
@property
def use_candidate(self): return self.__candidate_supported and not self.__force_running
@property
def commit_per_rule(self): return self.__commit_per_delete
@property
def vendor(self): return self.__vendor
@RETRY_DECORATOR
def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin
with self.__lock:
return self.__manager.get(filter=filter, with_defaults=with_defaults)
@RETRY_DECORATOR
def edit_config(
self, config, target='running', default_operation=None, test_option=None,
error_option=None, format='xml' # pylint: disable=redefined-builtin
):
if config == EMPTY_CONFIG: return
with self.__lock:
self.__manager.edit_config(
config, target=target, default_operation=default_operation, test_option=test_option,
error_option=error_option, format=format)
def locked(self, target):
return self.__manager.locked(target=target)
def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None):
return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id)
def edit_config(
netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False,
target='running', default_operation='merge', test_option=None, error_option=None,
format='xml' # pylint: disable=redefined-builtin
):
str_method = 'DeleteConfig' if delete else 'SetConfig'
LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources)))
results = [None for _ in resources]
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
try:
LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource)))
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, min_length=2, max_length=2)
resource_key,resource_value = resource
chk_string(str_resource_name + '.key', resource_key, allow_empty=False)
str_config_message = compose_config(
resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor)
if str_config_message is None: raise UnsupportedResourceKeyException(resource_key)
LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format(
str_method, len(str_config_message), str(str_config_message)))
netconf_handler.edit_config(
config=str_config_message, target=target, default_operation=default_operation,
test_option=test_option, error_option=error_option, format=format)
if commit_per_rule:
netconf_handler.commit()
results[i] = True
except Exception as e: # pylint: disable=broad-except
str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting')
msg = '[{:s}] Exception {:s} {:s}: {:s}'
LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource)))
results[i] = e # if validation fails, store the exception
return results
......@@ -12,25 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import anytree, copy, logging, pytz, queue, re, threading
import anytree, logging, pytz, queue, threading
#import lxml.etree as ET
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, Iterator, List, Optional, Tuple, Union
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.job import Job
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from ncclient.manager import Manager, connect_ssh
from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF
from common.tools.client.RetryDecorator import delay_exponential
from common.type_checkers.Checkers import chk_length, chk_string, chk_type, chk_float
from device.service.driver_api.Exceptions import UnsupportedResourceKeyException
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value #dump_subtree
from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value
from .templates import ALL_RESOURCE_KEYS, get_filter, parse
from .NetconfSessionHandler import NetconfSessionHandler, edit_config
from .SamplesCache import SamplesCache, do_sampling #dump_subtree
#from .TelemetryProtocolEnum import TelemetryProtocolEnum, parse_telemetry_protocol
#from .Tools import xml_pretty_print, xml_to_dict, xml_to_file
from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse
from .RetryDecorator import retry
from .TelemetryProtocolEnum import DEFAULT_TELEMETRY_PROTOCOL, TelemetryProtocolEnum, parse_telemetry_protocol
DEBUG_MODE = False
logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
......@@ -41,136 +39,6 @@ logging.getLogger('monitoring-client').setLevel(logging.INFO if DEBUG_MODE else
LOGGER = logging.getLogger(__name__)
RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*')
RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*")
# Collection of samples through NetConf is very slow and each request collects all the data.
# Populate a cache periodically (when first interface is interrogated).
# Evict data after some seconds, when data is considered as outdated
SAMPLE_EVICTION_SECONDS = 30.0 # seconds
SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters'
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class NetconfSessionHandler:
def __init__(self, address : str, port : int, **settings) -> None:
self.__lock = threading.RLock()
self.__connected = threading.Event()
self.__address = address
self.__port = int(port)
self.__username = settings.get('username')
self.__password = settings.get('password')
self.__vendor = settings.get('vendor')
self.__key_filename = settings.get('key_filename')
self.__hostkey_verify = settings.get('hostkey_verify', True)
self.__look_for_keys = settings.get('look_for_keys', True)
self.__allow_agent = settings.get('allow_agent', True)
self.__force_running = settings.get('force_running', False)
self.__commit_per_delete = settings.get('delete_rule', False)
self.__device_params = settings.get('device_params', {})
self.__manager_params = settings.get('manager_params', {})
self.__nc_params = settings.get('nc_params', {})
self.__manager : Manager = None
self.__candidate_supported = False
def connect(self):
with self.__lock:
self.__manager = connect_ssh(
host=self.__address, port=self.__port, username=self.__username, password=self.__password,
device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params,
key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent,
look_for_keys=self.__look_for_keys)
self.__candidate_supported = ':candidate' in self.__manager.server_capabilities
self.__connected.set()
def disconnect(self):
if not self.__connected.is_set(): return
with self.__lock:
self.__manager.close_session()
@property
def use_candidate(self): return self.__candidate_supported and not self.__force_running
@property
def commit_per_rule(self): return self.__commit_per_delete
@property
def vendor(self): return self.__vendor
@RETRY_DECORATOR
def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin
with self.__lock:
return self.__manager.get(filter=filter, with_defaults=with_defaults)
@RETRY_DECORATOR
def edit_config(
self, config, target='running', default_operation=None, test_option=None,
error_option=None, format='xml' # pylint: disable=redefined-builtin
):
if config == EMPTY_CONFIG: return
with self.__lock:
self.__manager.edit_config(
config, target=target, default_operation=default_operation, test_option=test_option,
error_option=error_option, format=format)
def locked(self, target):
return self.__manager.locked(target=target)
def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None):
return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id)
def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None:
try:
timestamp, samples = samples_cache.get(resource_key)
counter_name = resource_key.split('/')[-1].split(':')[-1]
value = samples.get(counter_name)
if value is None:
LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key))
return
# resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s}
sample = (timestamp, resource_key, value)
out_samples.put_nowait(sample)
except: # pylint: disable=bare-except
LOGGER.exception('Error retrieving samples')
def edit_config(
netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False,
target='running', default_operation='merge', test_option=None, error_option=None,
format='xml' # pylint: disable=redefined-builtin
):
str_method = 'DeleteConfig' if delete else 'SetConfig'
LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources)))
results = [None for _ in resources]
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
try:
LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource)))
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, min_length=2, max_length=2)
resource_key,resource_value = resource
chk_string(str_resource_name + '.key', resource_key, allow_empty=False)
str_config_message = compose_config(
resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor)
if str_config_message is None: raise UnsupportedResourceKeyException(resource_key)
LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format(
str_method, len(str_config_message), str(str_config_message)))
netconf_handler.edit_config(
config=str_config_message, target=target, default_operation=default_operation,
test_option=test_option, error_option=error_option, format=format)
if commit_per_rule:
netconf_handler.commit()
results[i] = True
except Exception as e: # pylint: disable=broad-except
str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting')
msg = '[{:s}] Exception {:s} {:s}: {:s}'
LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource)))
results[i] = e # if validation fails, store the exception
return results
HISTOGRAM_BUCKETS = (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
0.0001, 0.00025, 0.00050, 0.00075,
......@@ -198,11 +66,11 @@ class OpenConfigDriver(_Driver):
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__telemetry_protocol = parse_telemetry_protocol(settings.get('telemetry_protocol'))
if self.__telemetry_protocol == TelemetryProtocolEnum.GNMI:
self.__telemetry = GnmiTelemetry()
else:
self.__telemetry = NetConfTelemetry()
#self.__telemetry_protocol = parse_telemetry_protocol(settings.get('telemetry_protocol'))
#if self.__telemetry_protocol == TelemetryProtocolEnum.GNMI:
# self.__telemetry = GnmiTelemetry()
#else:
# self.__telemetry = NetConfTelemetry()
self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
self.__scheduler.configure(
......@@ -268,7 +136,8 @@ class OpenConfigDriver(_Driver):
if self.__netconf_handler.use_candidate:
with self.__netconf_handler.locked(target='candidate'):
if self.__netconf_handler.commit_per_rule:
results = edit_config(self.__netconf_handler, resources, target='candidate', commit_per_rule= True)
results = edit_config(
self.__netconf_handler, resources, target='candidate', commit_per_rule=True)
else:
results = edit_config(self.__netconf_handler, resources, target='candidate')
try:
......@@ -288,13 +157,15 @@ class OpenConfigDriver(_Driver):
if self.__netconf_handler.use_candidate:
with self.__netconf_handler.locked(target='candidate'):
if self.__netconf_handler.commit_per_rule:
results = edit_config(self.__netconf_handler, resources, target='candidate', delete=True, commit_per_rule= True)
results = edit_config(
self.__netconf_handler, resources, target='candidate', delete=True, commit_per_rule=True)
else:
results = edit_config(self.__netconf_handler, resources, target='candidate', delete=True)
try:
self.__netconf_handler.commit()
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('[DeleteConfig] Exception commiting resources: {:s}'.format(str(resources)))
MSG = '[DeleteConfig] Exception commiting resources: {:s}'
LOGGER.exception(MSG.format(str(resources)))
results = [e for _ in resources] # if commit fails, set exception in each resource
else:
results = edit_config(self.__netconf_handler, resources, delete=True)
......
......@@ -12,6 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Collection of samples through NetConf is very slow and each request collects all the data.
# Populate a cache periodically (when first interface is interrogated).
# Evict data after some seconds, when data is considered as outdated
import copy, queue, logging, re, threading
from datetime import datetime
from typing import Dict, Tuple
from .templates import get_filter, parse
from .NetconfSessionHandler import NetconfSessionHandler
SAMPLE_EVICTION_SECONDS = 30.0 # seconds
SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters'
RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*')
RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*")
LOGGER = logging.getLogger(__name__)
def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp):
if previous_sample is None: return None
if previous_timestamp is None: return None
......@@ -66,3 +84,17 @@ class SamplesCache:
if match is None: return self.__timestamp, {}
interface = match.group(1)
return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {}))
def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None:
try:
timestamp, samples = samples_cache.get(resource_key)
counter_name = resource_key.split('/')[-1].split(':')[-1]
value = samples.get(counter_name)
if value is None:
LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key))
return
# resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s}
sample = (timestamp, resource_key, value)
out_samples.put_nowait(sample)
except: # pylint: disable=bare-except
LOGGER.exception('Error retrieving samples')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment