Skip to content
Snippets Groups Projects
OpenConfigDriver.py 12.2 KiB
Newer Older
import anytree, logging, pytz, queue, threading
from anytree.importer import DictImporter
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from datetime import datetime, timedelta
import lxml.etree as ET
from typing import Any, Iterator, List, Optional, Tuple, Union
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from apscheduler.executors.pool import ThreadPoolExecutor
#from apscheduler.job import Job
#from apscheduler.jobstores.memory import MemoryJobStore
#from apscheduler.schedulers.background import BackgroundScheduler
from netconf_client.connect import connect_ssh
from netconf_client.ncclient import Manager
from common.type_checkers.Checkers import chk_float, chk_integer, chk_length, chk_string, chk_type
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
from device.service.drivers.openconfig.Tools import xml_pretty_print, xml_to_dict, xml_to_file
from device.service.drivers.openconfig.templates import ALL_RESOURCES, get_filter, extract_data
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

logging.getLogger('ncclient.transport.ssh').setLevel(logging.WARNING)

LOGGER = logging.getLogger(__name__)

#def do_sampling(resource_key : str, out_samples : queue.Queue):
#    out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random()))

class OpenConfigDriver(_Driver):
    def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called
        self.__address = address
        self.__port = int(port)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.__settings = settings
        self.__lock = threading.Lock()
        #self.__initial = TreeNode('.')
        #self.__running = TreeNode('.')
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__netconf_manager : Manager = None
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
        #self.__scheduler.configure(
        #    jobstores = {'default': MemoryJobStore()},
        #    executors = {'default': ThreadPoolExecutor(max_workers=1)},
        #    job_defaults = {'coalesce': False, 'max_instances': 3},
        #    timezone=pytz.utc)
        #self.__out_samples = queue.Queue()

    def Connect(self) -> bool:
        with self.__lock:
            if self.__started.is_set(): return True
            username = self.__settings.get('username')
            password = self.__settings.get('password')
            timeout = int(self.__settings.get('timeout', 120))
            session = connect_ssh(
                host=self.__address, port=self.__port, username=username, password=password)
            self.__netconf_manager = Manager(session, timeout=timeout)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Connect triggers activation of sampling events that will be scheduled based on subscriptions
            #self.__scheduler.start()
            self.__started.set()
            return True

    def Disconnect(self) -> bool:
        with self.__lock:
            # Trigger termination of loops and processes
            self.__terminate.set()
            # If not started, assume it is already disconnected
            if not self.__started.is_set(): return True
            # Disconnect triggers deactivation of sampling events
            #self.__scheduler.shutdown()
            self.__netconf_manager.close_session()
            return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []

    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        for i,resource_key in enumerate(resource_keys):
            chk_string('resource_key[#{:d}]'.format(i), resource_key, allow_empty=False)

        results = []
        with self.__lock:
            if len(resource_keys) == 0: resource_keys = ALL_RESOURCES
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    str_filter = get_filter(resource_key)
                    if str_filter is None: str_filter = resource_key
                    xml_data = self.__netconf_manager.get(filter=str_filter).data_ele
                    if isinstance(xml_data, Exception): raise xml_data
                    results.extend(extract_data(resource_key, xml_data))
                except Exception as e: # pylint: disable=broad-except
                    LOGGER.exception('Exception retrieving {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                    results.append((resource_key, e)) # if validation fails, store the exception
                    continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return results

#    def GetResource(self, endpoint_uuid : str) -> Optional[str]:
#        chk_string('endpoint_uuid', endpoint_uuid)
#        return {
#            #'key': 'value',
#        }.get(endpoint_uuid)
#
#    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
#        chk_type('resources', resources, list)
#        if len(resources) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource in enumerate(resources):
#                str_resource_name = 'resources[#{:d}]'.format(i)
#                try:
#                    chk_type(str_resource_name, resource, (list, tuple))
#                    chk_length(str_resource_name, resource, min_length=2, max_length=2)
#                    resource_key,resource_value = resource
#                    resource_path = resource_key.split('/')
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                set_subnode_value(resolver, self.__running, resource_path, resource_value)
#                results.append(True)
#        return results
#
#    def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
#        chk_type('resources', resource_keys, list)
#        if len(resource_keys) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource_key in enumerate(resource_keys):
#                str_resource_name = 'resource_key[#{:d}]'.format(i)
#                try:
#                    chk_string(str_resource_name, resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
#                # if not found, resource_node is None
#                if resource_node is None:
#                    results.append(False)
#                    continue
#
#                parent = resource_node.parent
#                children = list(parent.children)
#                children.remove(resource_node)
#                parent.children = tuple(children)
#                results.append(True)
#        return results
#
#    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
#        chk_type('subscriptions', subscriptions, list)
#        if len(subscriptions) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,subscription in enumerate(subscriptions):
#                str_subscription_name = 'subscriptions[#{:d}]'.format(i)
#                try:
#                    chk_type(str_subscription_name, subscription, (list, tuple))
#                    chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
#                    resource_key,sampling_duration,sampling_interval = subscription
#                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
#                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                start_date,end_date = None,None
#                if sampling_duration <= 1.e-12:
#                    start_date = datetime.utcnow()
#                    end_date = start_date + timedelta(seconds=sampling_duration)
#
#                job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
#                job = self.__scheduler.add_job(
#                    do_sampling, args=(resource_key, self.__out_samples), kwargs={},
#                    id=job_id, trigger='interval', seconds=sampling_interval,
#                    start_date=start_date, end_date=end_date, timezone=pytz.utc)
#
#                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
#                set_subnode_value(resolver, self.__running, subscription_path, job)
#                results.append(True)
#        return results
#
#    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
#        chk_type('subscriptions', subscriptions, list)
#        if len(subscriptions) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource in enumerate(subscriptions):
#                str_subscription_name = 'resources[#{:d}]'.format(i)
#                try:
#                    chk_type(str_subscription_name, resource, (list, tuple))
#                    chk_length(str_subscription_name, resource, min_length=3, max_length=3)
#                    resource_key,sampling_duration,sampling_interval = resource
#                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
#                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
#                subscription_node = get_subnode(resolver, self.__running, subscription_path)
#
#                # if not found, resource_node is None
#                if subscription_node is None:
#                    results.append(False)
#                    continue
#
#                job : Job = getattr(subscription_node, 'value', None)
#                if job is None or not isinstance(job, Job):
#                    raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
#                job.remove()
#
#                parent = subscription_node.parent
#                children = list(parent.children)
#                children.remove(subscription_node)
#                parent.children = tuple(children)
#
#                results.append(True)
#        return results
#
#    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
#        while not self.__terminate.is_set():
#            try:
#                sample = self.__out_samples.get(block=blocking, timeout=0.1)
#            except queue.Empty:
#                if blocking: continue
#                return
#            if sample is None: continue
#            yield sample