import anytree, logging, pytz, queue, threading from anytree.importer import DictImporter #from datetime import datetime, timedelta import lxml.etree as ET from typing import Any, Iterator, List, Optional, Tuple, Union #from apscheduler.executors.pool import ThreadPoolExecutor #from apscheduler.job import Job #from apscheduler.jobstores.memory import MemoryJobStore #from apscheduler.schedulers.background import BackgroundScheduler from netconf_client.connect import connect_ssh from netconf_client.ncclient import Manager from common.type_checkers.Checkers import chk_float, chk_integer, chk_length, chk_string, chk_type from device.service.driver_api._Driver import _Driver from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value from device.service.drivers.openconfig.Tools import xml_pretty_print, xml_to_dict, xml_to_file from device.service.drivers.openconfig.templates import ALL_RESOURCES, get_filter, extract_data logging.getLogger('ncclient.transport.ssh').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) #def do_sampling(resource_key : str, out_samples : queue.Queue): # out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random())) class OpenConfigDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called self.__address = address self.__port = int(port) self.__settings = settings self.__lock = threading.Lock() #self.__initial = TreeNode('.') #self.__running = TreeNode('.') self.__started = threading.Event() self.__terminate = threading.Event() self.__netconf_manager : Manager = None #self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events #self.__scheduler.configure( # jobstores = {'default': MemoryJobStore()}, # executors = {'default': ThreadPoolExecutor(max_workers=1)}, # job_defaults = {'coalesce': False, 'max_instances': 3}, # timezone=pytz.utc) #self.__out_samples = queue.Queue() def Connect(self) -> bool: with self.__lock: if self.__started.is_set(): return True username = self.__settings.get('username') password = self.__settings.get('password') timeout = int(self.__settings.get('timeout', 120)) session = connect_ssh( host=self.__address, port=self.__port, username=username, password=password) self.__netconf_manager = Manager(session, timeout=timeout) # Connect triggers activation of sampling events that will be scheduled based on subscriptions #self.__scheduler.start() self.__started.set() return True def Disconnect(self) -> bool: with self.__lock: # Trigger termination of loops and processes self.__terminate.set() # If not started, assume it is already disconnected if not self.__started.is_set(): return True # Disconnect triggers deactivation of sampling events #self.__scheduler.shutdown() self.__netconf_manager.close_session() return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) for i,resource_key in enumerate(resource_keys): chk_string('resource_key[#{:d}]'.format(i), resource_key, allow_empty=False) results = [] with self.__lock: if len(resource_keys) == 0: resource_keys = ALL_RESOURCES for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: str_filter = get_filter(resource_key) if str_filter is None: str_filter = resource_key xml_data = self.__netconf_manager.get(filter=str_filter).data_ele if isinstance(xml_data, Exception): raise xml_data results.extend(extract_data(resource_key, xml_data)) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception retrieving {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) # if validation fails, store the exception continue return results # def GetResource(self, endpoint_uuid : str) -> Optional[str]: # chk_string('endpoint_uuid', endpoint_uuid) # return { # #'key': 'value', # }.get(endpoint_uuid) # # def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # chk_type('resources', resources, list) # if len(resources) == 0: return [] # results = [] # resolver = anytree.Resolver(pathattr='name') # with self.__lock: # for i,resource in enumerate(resources): # str_resource_name = 'resources[#{:d}]'.format(i) # try: # chk_type(str_resource_name, resource, (list, tuple)) # chk_length(str_resource_name, resource, min_length=2, max_length=2) # resource_key,resource_value = resource # resource_path = resource_key.split('/') # except Exception as e: # pylint: disable=broad-except # LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) # results.append(e) # if validation fails, store the exception # continue # # set_subnode_value(resolver, self.__running, resource_path, resource_value) # results.append(True) # return results # # def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]: # chk_type('resources', resource_keys, list) # if len(resource_keys) == 0: return [] # results = [] # resolver = anytree.Resolver(pathattr='name') # with self.__lock: # for i,resource_key in enumerate(resource_keys): # str_resource_name = 'resource_key[#{:d}]'.format(i) # try: # chk_string(str_resource_name, resource_key, allow_empty=False) # resource_path = resource_key.split('/') # except Exception as e: # pylint: disable=broad-except # LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) # results.append(e) # if validation fails, store the exception # continue # # resource_node = get_subnode(resolver, self.__running, resource_path, default=None) # # if not found, resource_node is None # if resource_node is None: # results.append(False) # continue # # parent = resource_node.parent # children = list(parent.children) # children.remove(resource_node) # parent.children = tuple(children) # results.append(True) # return results # # def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # chk_type('subscriptions', subscriptions, list) # if len(subscriptions) == 0: return [] # results = [] # resolver = anytree.Resolver(pathattr='name') # with self.__lock: # for i,subscription in enumerate(subscriptions): # str_subscription_name = 'subscriptions[#{:d}]'.format(i) # try: # chk_type(str_subscription_name, subscription, (list, tuple)) # chk_length(str_subscription_name, subscription, min_length=3, max_length=3) # resource_key,sampling_duration,sampling_interval = subscription # chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) # resource_path = resource_key.split('/') # chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) # chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) # except Exception as e: # pylint: disable=broad-except # LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) # results.append(e) # if validation fails, store the exception # continue # # start_date,end_date = None,None # if sampling_duration <= 1.e-12: # start_date = datetime.utcnow() # end_date = start_date + timedelta(seconds=sampling_duration) # # job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) # job = self.__scheduler.add_job( # do_sampling, args=(resource_key, self.__out_samples), kwargs={}, # id=job_id, trigger='interval', seconds=sampling_interval, # start_date=start_date, end_date=end_date, timezone=pytz.utc) # # subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] # set_subnode_value(resolver, self.__running, subscription_path, job) # results.append(True) # return results # # def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: # chk_type('subscriptions', subscriptions, list) # if len(subscriptions) == 0: return [] # results = [] # resolver = anytree.Resolver(pathattr='name') # with self.__lock: # for i,resource in enumerate(subscriptions): # str_subscription_name = 'resources[#{:d}]'.format(i) # try: # chk_type(str_subscription_name, resource, (list, tuple)) # chk_length(str_subscription_name, resource, min_length=3, max_length=3) # resource_key,sampling_duration,sampling_interval = resource # chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) # resource_path = resource_key.split('/') # chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) # chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) # except Exception as e: # pylint: disable=broad-except # LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) # results.append(e) # if validation fails, store the exception # continue # # subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] # subscription_node = get_subnode(resolver, self.__running, subscription_path) # # # if not found, resource_node is None # if subscription_node is None: # results.append(False) # continue # # job : Job = getattr(subscription_node, 'value', None) # if job is None or not isinstance(job, Job): # raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource))) # job.remove() # # parent = subscription_node.parent # children = list(parent.children) # children.remove(subscription_node) # parent.children = tuple(children) # # results.append(True) # return results # # def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: # while not self.__terminate.is_set(): # try: # sample = self.__out_samples.get(block=blocking, timeout=0.1) # except queue.Empty: # if blocking: continue # return # if sample is None: continue # yield sample