Skip to content
Snippets Groups Projects
OpenConfigDriver.py 12.4 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, ncclient.manager, pytz, queue, threading
import xml.dom.minidom
from typing import Any, Iterator, List, Optional, Tuple, Union
#import anytree, random
#from datetime import datetime, timedelta
#from apscheduler.executors.pool import ThreadPoolExecutor
#from apscheduler.job import Job
#from apscheduler.jobstores.memory import MemoryJobStore
#from apscheduler.schedulers.background import BackgroundScheduler
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.drivers.openconfig.handlers import DEFAULT_HANDLER, HANDLERS
#from .AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value

logging.getLogger('ncclient.transport.ssh').setLevel(logging.WARNING)

LOGGER = logging.getLogger(__name__)

#def do_sampling(resource_key : str, out_samples : queue.Queue):
#    out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random()))

class OpenConfigDriver(_Driver):
    def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called
        self.__address = address
        self.__port = port
        self.__settings = settings
        self.__lock = threading.Lock()
        #self.__initial = TreeNode('.')
        #self.__running = TreeNode('.')
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__netconf_manager : ncclient.manager.Manager = None
        #self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
        #self.__scheduler.configure(
        #    jobstores = {'default': MemoryJobStore()},
        #    executors = {'default': ThreadPoolExecutor(max_workers=1)},
        #    job_defaults = {'coalesce': False, 'max_instances': 3},
        #    timezone=pytz.utc)
        #self.__out_samples = queue.Queue()

    def Connect(self) -> bool:
        with self.__lock:
            if self.__started.is_set(): return True
            username = self.__settings.get('username')
            password = self.__settings.get('password')
            handler_name = self.__settings.get('handler')
            handler = HANDLERS.get(handler_name, DEFAULT_HANDLER)
            self.__netconf_manager = ncclient.manager.connect_ssh(
                host=self.__address, port=self.__port, username=username, password=password, hostkey_verify=False,
                device_params=handler)
            # Connect triggers activation of sampling events that will be scheduled based on subscriptions
            #self.__scheduler.start()
            self.__started.set()
            return True

    def Disconnect(self) -> bool:
        with self.__lock:
            # Trigger termination of loops and processes
            self.__terminate.set()
            # If not started, assume it is already disconnected
            if not self.__started.is_set(): return True
            # Disconnect triggers deactivation of sampling events
            #self.__scheduler.shutdown()
            self.__netconf_manager.close_session()
            return True

    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []

    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        for i,resource_key in enumerate(resource_keys):
            chk_string('resource_key[#{:d}]'.format(i), resource_key, allow_empty=False)

        results = []
        with self.__lock:
            if len(resource_keys) == 0:
                config = self.__netconf_manager.get_config(source='running').data_xml
                with open('../data/drx30-01.xml', mode='w', encoding='UTF-8') as f:
                    dom = xml.dom.minidom.parseString(config)
                    f.write(dom.toprettyxml())
        return results

#            resolver = anytree.Resolver(pathattr='name')
#            for i,resource_key in enumerate(resource_keys):
#                str_resource_name = 'resource_key[#{:d}]'.format(i)
#                try:
#                    resource_path = resource_key.split('/')
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
#                    results.append((resource_key, e)) # if validation fails, store the exception
#                    continue
#
#                resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
#                # if not found, resource_node is None
#                if resource_node is None: continue
#                results.extend(dump_subtree(resource_node))
#            return results
#
#    def GetResource(self, endpoint_uuid : str) -> Optional[str]:
#        chk_string('endpoint_uuid', endpoint_uuid)
#        return {
#            #'key': 'value',
#        }.get(endpoint_uuid)
#
#    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
#        chk_type('resources', resources, list)
#        if len(resources) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource in enumerate(resources):
#                str_resource_name = 'resources[#{:d}]'.format(i)
#                try:
#                    chk_type(str_resource_name, resource, (list, tuple))
#                    chk_length(str_resource_name, resource, min_length=2, max_length=2)
#                    resource_key,resource_value = resource
#                    resource_path = resource_key.split('/')
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                set_subnode_value(resolver, self.__running, resource_path, resource_value)
#                results.append(True)
#        return results
#
#    def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
#        chk_type('resources', resource_keys, list)
#        if len(resource_keys) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource_key in enumerate(resource_keys):
#                str_resource_name = 'resource_key[#{:d}]'.format(i)
#                try:
#                    chk_string(str_resource_name, resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
#                # if not found, resource_node is None
#                if resource_node is None:
#                    results.append(False)
#                    continue
#
#                parent = resource_node.parent
#                children = list(parent.children)
#                children.remove(resource_node)
#                parent.children = tuple(children)
#                results.append(True)
#        return results
#
#    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
#        chk_type('subscriptions', subscriptions, list)
#        if len(subscriptions) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,subscription in enumerate(subscriptions):
#                str_subscription_name = 'subscriptions[#{:d}]'.format(i)
#                try:
#                    chk_type(str_subscription_name, subscription, (list, tuple))
#                    chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
#                    resource_key,sampling_duration,sampling_interval = subscription
#                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
#                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                start_date,end_date = None,None
#                if sampling_duration <= 1.e-12:
#                    start_date = datetime.utcnow()
#                    end_date = start_date + timedelta(seconds=sampling_duration)
#
#                job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
#                job = self.__scheduler.add_job(
#                    do_sampling, args=(resource_key, self.__out_samples), kwargs={},
#                    id=job_id, trigger='interval', seconds=sampling_interval,
#                    start_date=start_date, end_date=end_date, timezone=pytz.utc)
#
#                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
#                set_subnode_value(resolver, self.__running, subscription_path, job)
#                results.append(True)
#        return results
#
#    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
#        chk_type('subscriptions', subscriptions, list)
#        if len(subscriptions) == 0: return []
#        results = []
#        resolver = anytree.Resolver(pathattr='name')
#        with self.__lock:
#            for i,resource in enumerate(subscriptions):
#                str_subscription_name = 'resources[#{:d}]'.format(i)
#                try:
#                    chk_type(str_subscription_name, resource, (list, tuple))
#                    chk_length(str_subscription_name, resource, min_length=3, max_length=3)
#                    resource_key,sampling_duration,sampling_interval = resource
#                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
#                    resource_path = resource_key.split('/')
#                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
#                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
#                except Exception as e: # pylint: disable=broad-except
#                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
#                    results.append(e) # if validation fails, store the exception
#                    continue
#
#                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
#                subscription_node = get_subnode(resolver, self.__running, subscription_path)
#
#                # if not found, resource_node is None
#                if subscription_node is None:
#                    results.append(False)
#                    continue
#
#                job : Job = getattr(subscription_node, 'value', None)
#                if job is None or not isinstance(job, Job):
#                    raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
#                job.remove()
#
#                parent = subscription_node.parent
#                children = list(parent.children)
#                children.remove(subscription_node)
#                parent.children = tuple(children)
#
#                results.append(True)
#        return results
#
#    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
#        while not self.__terminate.is_set():
#            try:
#                sample = self.__out_samples.get(block=blocking, timeout=0.1)
#            except queue.Empty:
#                if blocking: continue
#                return
#            if sample is None: continue
#            yield sample