Skip to content
Snippets Groups Projects
Select Git revision
  • dcb44d803eb0260d16e96938c2096c23079b0f3e
  • master default protected
  • release/6.0.0 protected
  • develop protected
  • feat/345-cttc-fix-ci-cd-and-unit-tests-for-dscm-pluggables
  • feat/349-new-monitoring-updates-for-optical-controller-integration
  • cnit_ofc26
  • ofc_polimi
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • CTTC-TEST-SMARTNICS-6GMICROSDN-ZTP
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit_tapi
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • v6.0.0 protected
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

OpenConfigDriver.py

Blame
  • user avatar
    PabloArmingolRobles authored
    - Implemented support for ACLs
    - Added suport for Candidate datastore
    dcb44d80
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    OpenConfigDriver.py 20.63 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import anytree, copy, logging, pytz, queue, re, threading
    #import lxml.etree as ET
    from datetime import datetime, timedelta
    from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.job import Job
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.schedulers.background import BackgroundScheduler
    from netconf_client.connect import connect_ssh, Session
    from netconf_client.ncclient import Manager
    from common.tools.client.RetryDecorator import delay_exponential
    from common.type_checkers.Checkers import chk_length, chk_string, chk_type, chk_float
    from device.service.driver_api.Exceptions import UnsupportedResourceKeyException
    from device.service.driver_api._Driver import _Driver
    from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value #dump_subtree
    #from .Tools import xml_pretty_print, xml_to_dict, xml_to_file
    from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse
    from .RetryDecorator import retry
    
    
    from ncclient import manager
    
    
    DEBUG_MODE = True
    #logging.getLogger('ncclient.transport.ssh').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
    logging.getLogger('apscheduler.executors.default').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR)
    logging.getLogger('apscheduler.scheduler').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR)
    logging.getLogger('monitoring-client').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR)
    
    LOGGER = logging.getLogger(__name__)
    
    RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*')
    RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*")
    
    # Collection of samples through NetConf is very slow and each request collects all the data.
    # Populate a cache periodically (when first interface is interrogated).
    # Evict data after some seconds, when data is considered as outdated
    
    SAMPLE_EVICTION_SECONDS = 30.0 # seconds
    SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters'
    
    MAX_RETRIES = 15
    DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
    RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
    
    class NetconfSessionHandler:
        def __init__(self, address : str, port : int, **settings) -> None:
            self.__lock = threading.RLock()
            self.__connected = threading.Event()
            self.__address = address
            self.__port = int(port)
            self.__username = settings.get('username')
            self.__password = settings.get('password')
            self.__timeout = int(settings.get('timeout', 120))
            self.__netconf_session : Session = None
            self.__netconf_manager : Manager = None
    
        def connect(self):        
            with self.__lock:
                self.__netconf_session = connect_ssh(
                    host=self.__address, port=self.__port, username=self.__username, password=self.__password)
                self.__netconf_manager = Manager(self.__netconf_session, timeout=self.__timeout)
                self.__netconf_manager.set_logger_level(logging.DEBUG if DEBUG_MODE else logging.WARNING)
                self.__connected.set()
            
        def disconnect(self):
            if not self.__connected.is_set(): return
            with self.__lock:
                self.__netconf_manager.close_session()
    
        @RETRY_DECORATOR
        def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin
            with self.__lock:
                return self.__netconf_manager.get(filter=filter, with_defaults=with_defaults)
    
        @RETRY_DECORATOR
        def edit_config(
            self, config, target='running', default_operation=None, test_option=None,
            error_option=None, format='xml'                                             # pylint: disable=redefined-builtin
        ):
            if config == EMPTY_CONFIG: return
            with self.__lock:
                self.__netconf_manager.edit_config(
                    config, target=target, default_operation=default_operation, test_option=test_option,
                    error_option=error_option, format=format)
    
    def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp):
        if previous_sample is None: return None
        if previous_timestamp is None: return None
        if current_sample is None: return None
        if current_timestamp is None: return None
        delay = current_timestamp - previous_timestamp
        field_keys = set(previous_sample.keys()).union(current_sample.keys())
        field_keys.discard('name')
        delta_sample = {'name': previous_sample['name']}
        for field_key in field_keys:
            previous_sample_value = previous_sample[field_key]
            if not isinstance(previous_sample_value, (int, float)): continue
            current_sample_value = current_sample[field_key]
            if not isinstance(current_sample_value, (int, float)): continue
            delta_value = current_sample_value - previous_sample_value
            if delta_value < 0: continue
            delta_sample[field_key] = delta_value / delay
        return delta_sample
    
    class SamplesCache:
        def __init__(self, netconf_handler : NetconfSessionHandler) -> None:
            self.__netconf_handler = netconf_handler
            self.__lock = threading.Lock()
            self.__timestamp = None
            self.__absolute_samples = {}
            self.__delta_samples = {}
    
        def _refresh_samples(self) -> None:
            with self.__lock:
                try:
                    now = datetime.timestamp(datetime.utcnow())
                    if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return
                    str_filter = get_filter(SAMPLE_RESOURCE_KEY)
                    xml_data = self.__netconf_handler.get(filter=str_filter).data_ele
                    interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data)
                    for interface,samples in interface_samples:
                        match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface)
                        if match is None: continue
                        interface = match.group(1)
                        delta_sample = compute_delta_sample(
                            self.__absolute_samples.get(interface), self.__timestamp, samples, now)
                        if delta_sample is not None: self.__delta_samples[interface] = delta_sample
                        self.__absolute_samples[interface] = samples
                    self.__timestamp = now
                except: # pylint: disable=bare-except
                    LOGGER.exception('Error collecting samples')
    
        def get(self, resource_key : str) -> Tuple[float, Dict]:
            self._refresh_samples()
            match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key)
            with self.__lock:
                if match is None: return self.__timestamp, {}
                interface = match.group(1)
                return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {}))
    
    def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None:
        try:
            timestamp, samples = samples_cache.get(resource_key)
            counter_name = resource_key.split('/')[-1].split(':')[-1]
            value = samples.get(counter_name)
            if value is None:
                LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key))
                return
            sample = (timestamp, resource_key, value)
            out_samples.put_nowait(sample)
        except: # pylint: disable=bare-except
            LOGGER.exception('Error retrieving samples')
    
    class OpenConfigDriver(_Driver):
        def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called
            self.__lock = threading.Lock()
            #self.__initial = TreeNode('.')
            #self.__running = TreeNode('.')
            self.__subscriptions = TreeNode('.')
            self.__started = threading.Event()
            self.__terminate = threading.Event()
            self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
            self.__scheduler.configure(
                jobstores = {'default': MemoryJobStore()},
                executors = {'default': ThreadPoolExecutor(max_workers=1)},
                job_defaults = {'coalesce': False, 'max_instances': 3},
                timezone=pytz.utc)
            self.__out_samples = queue.Queue()
            self.__netconf_handler : NetconfSessionHandler = NetconfSessionHandler(address, port, **settings)
            self.__samples_cache = SamplesCache(self.__netconf_handler)
    
        def Connect(self) -> bool:
            with self.__lock:
                if self.__started.is_set(): return True
                self.__netconf_handler.connect()
                # Connect triggers activation of sampling events that will be scheduled based on subscriptions
                self.__scheduler.start()
                self.__started.set()
                return True
    
        def Disconnect(self) -> bool:
            with self.__lock:
                # Trigger termination of loops and processes
                self.__terminate.set()
                # If not started, assume it is already disconnected
                if not self.__started.is_set(): return True
                # Disconnect triggers deactivation of sampling events
                self.__scheduler.shutdown()
                self.__netconf_handler.disconnect()
                return True
    
        def GetInitialConfig(self) -> List[Tuple[str, Any]]:
            with self.__lock:
                return []
    
        def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
            chk_type('resources', resource_keys, list)
            results = []
            with self.__lock:
                if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
                for i,resource_key in enumerate(resource_keys):
                    str_resource_name = 'resource_key[#{:d}]'.format(i)
                    try:
                        chk_string(str_resource_name, resource_key, allow_empty=False)
                        str_filter = get_filter(resource_key)
                        LOGGER.info('[GetConfig] str_filter = {:s}'.format(str(str_filter)))
                        if str_filter is None: str_filter = resource_key
                        xml_data = self.__netconf_handler.get(filter=str_filter).data_ele
                        if isinstance(xml_data, Exception): raise xml_data
                        results.extend(parse(resource_key, xml_data))
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception retrieving {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                        results.append((resource_key, e)) # if validation fails, store the exception
            return results
    
        def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
            chk_type('resources', resources, list)
            if len(resources) == 0: return []
            results = []
            LOGGER.info('[SetConfig] resources = {:s}'.format(str(resources)))
            with self.__lock:
                for i,resource in enumerate(resources):
                    str_resource_name = 'resources[#{:d}]'.format(i)
                    try:
                        LOGGER.info('[SetConfig] resource = {:s}'.format(str(resource)))
                        chk_type(str_resource_name, resource, (list, tuple))
                        chk_length(str_resource_name, resource, min_length=2, max_length=2)
                        resource_key,resource_value = resource
                        chk_string(str_resource_name + '.key', resource_key, allow_empty=False)
                        str_config_message = compose_config(resource_key, resource_value)
                        
                        with manager.connect(host=address, port=830, username='admin', password='admin',
                                        hostkey_verify=False, device_params={'name':'huaweiyang'},
                                        look_for_keys=False, allow_agent=False) as m:
                            assert(":candidate" in m.server_capabilities)
                            with m.locked(target='candidate'):
                                m.edit_config(config=str_config_message, default_operation="merge", target="candidate")
                                m.commit()
                                results.append(True)
                        """
                        if str_config_message is None: raise UnsupportedResourceKeyException(resource_key)
                        LOGGER.info('[SetConfig] str_config_message[{:d}] = {:s}'.format(
                            len(str_config_message), str(str_config_message)))
                        self.__netconf_handler.edit_config(str_config_message, target='running')
                        results.append(True)
                        """
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception setting {:s}: {:s}'.format(str_resource_name, str(resource)))
                        results.append(e) # if validation fails, store the exception         
            return results
    
        def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
            chk_type('resources', resources, list)
            if len(resources) == 0: return []
            results = []
            LOGGER.info('[DeleteConfig] resources = {:s}'.format(str(resources)))
            with self.__lock:
                for i,resource in enumerate(resources):
                    str_resource_name = 'resources[#{:d}]'.format(i)
                    try:
                        LOGGER.info('[DeleteConfig] resource = {:s}'.format(str(resource)))
                        chk_type(str_resource_name, resource, (list, tuple))
                        chk_length(str_resource_name, resource, min_length=2, max_length=2)
                        resource_key,resource_value = resource
                        chk_string(str_resource_name + '.key', resource_key, allow_empty=False)
                        str_config_message = compose_config(resource_key, resource_value, delete=True)
                        
                        with manager.connect(host=address, port=830, username='admin', password='admin',
                                        hostkey_verify=False, device_params={'name':'huaweiyang'},
                                        look_for_keys=False, allow_agent=False) as m:
                            assert(":candidate" in m.server_capabilities)
                            with m.locked(target='candidate'):
                                m.edit_config(config=str_config_message, default_operation="merge", target="candidate")
                                m.commit()
                                results.append(True)
                        
                        """
                        if str_config_message is None: raise UnsupportedResourceKeyException(resource_key)
                        LOGGER.info('[DeleteConfig] str_config_message[{:d}] = {:s}'.format(
                            len(str_config_message), str(str_config_message)))
                        self.__netconf_handler.edit_config(str_config_message, target='running')
                        results.append(True)
                        """
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception deleting {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                        results.append(e) # if validation fails, store the exception
            return results
    
        def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            chk_type('subscriptions', subscriptions, list)
            if len(subscriptions) == 0: return []
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
                for i,subscription in enumerate(subscriptions):
                    str_subscription_name = 'subscriptions[#{:d}]'.format(i)
                    try:
                        chk_type(str_subscription_name, subscription, (list, tuple))
                        chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
                        resource_key,sampling_duration,sampling_interval = subscription
                        chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                        resource_path = resource_key.split('/')
                        chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                        chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
                        results.append(e) # if validation fails, store the exception
                        continue
    
                    start_date,end_date = None,None
                    if sampling_duration >= 1.e-12:
                        start_date = datetime.utcnow()
                        end_date = start_date + timedelta(seconds=sampling_duration)
    
                    job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
                    job = self.__scheduler.add_job(
                        do_sampling, args=(self.__samples_cache, resource_key, self.__out_samples),
                        kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval,
                        start_date=start_date, end_date=end_date, timezone=pytz.utc)
    
                    subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
                    set_subnode_value(resolver, self.__subscriptions, subscription_path, job)
                    results.append(True)
            return results
    
        def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            chk_type('subscriptions', subscriptions, list)
            if len(subscriptions) == 0: return []
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
                for i,resource in enumerate(subscriptions):
                    str_subscription_name = 'resources[#{:d}]'.format(i)
                    try:
                        chk_type(str_subscription_name, resource, (list, tuple))
                        chk_length(str_subscription_name, resource, min_length=3, max_length=3)
                        resource_key,sampling_duration,sampling_interval = resource
                        chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                        resource_path = resource_key.split('/')
                        chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                        chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
                        results.append(e) # if validation fails, store the exception
                        continue
    
                    subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
                    subscription_node = get_subnode(resolver, self.__subscriptions, subscription_path)
    
                    # if not found, resource_node is None
                    if subscription_node is None:
                        results.append(False)
                        continue
    
                    job : Job = getattr(subscription_node, 'value', None)
                    if job is None or not isinstance(job, Job):
                        raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
                    job.remove()
    
                    parent = subscription_node.parent
                    children = list(parent.children)
                    children.remove(subscription_node)
                    parent.children = tuple(children)
    
                    results.append(True)
            return results
    
        def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
            while True:
                if self.__terminate.is_set(): break
                if terminate is not None and terminate.is_set(): break
                try:
                    sample = self.__out_samples.get(block=blocking, timeout=0.1)
                except queue.Empty:
                    if blocking: continue
                    return
                if sample is None: continue
                yield sample