import anytree, logging, pytz, queue, random, threading from datetime import datetime, timedelta from typing import Any, Iterator, List, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from common.Checkers import chk_float, chk_length, chk_string, chk_type from device.service.driver_api._Driver import _Driver from .AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) def sample(resource_key : str, out_samples : queue.Queue): out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random())) class EmulatedDriver(_Driver): def __init__(self, address : str, port : int, **kwargs) -> None: self.__lock = threading.Lock() self.__root = TreeNode('.') self.__terminate = threading.Event() self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( jobstores = {'default': MemoryJobStore()}, executors = {'default': ThreadPoolExecutor(max_workers=1)}, job_defaults = {'coalesce': False, 'max_instances': 3}, timezone=pytz.utc) self.__out_samples = queue.Queue() def Connect(self) -> bool: # Connect triggers activation of sampling events that will be scheduled based on subscriptions self.__scheduler.start() return True def Disconnect(self) -> bool: # Trigger termination of loops and processes self.__terminate.set() # Disconnect triggers deactivation of sampling events self.__scheduler.shutdown() return True def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]: chk_type('resources', resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__root) results = [] resolver = anytree.Resolver(pathattr='name') for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__root, resource_path, default=None) # if not found, resource_node is None results.append(None if resource_node is None else dump_subtree(resource_node)) return results def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, allowed_lengths=2) resource_key,resource_value = resource resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue set_subnode_value(resolver, self.__root, resource_path, resource_value) results.append(True) return results def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]: chk_type('resources', resource_keys, list) if len(resource_keys) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__root, resource_path, default=None) # if not found, resource_node is None if resource_node is None: results.append(False) continue parent = resource_node.parent children = list(parent.children) children.remove(resource_node) parent.children = tuple(children) results.append(True) return results def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, allowed_lengths=3) resource_key,sampling_duration,sampling_interval = resource chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0) chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0) except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue start_date,end_date = None,None if sampling_duration <= 1.e-12: start_date = datetime.utcnow() end_date = start_date + timedelta(seconds=sampling_duration) job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) job = self.__scheduler.add_job( sample, args=(resource_key, self.__out_samples), kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, end_date=end_date, timezone=pytz.utc) subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] set_subnode_value(resolver, self.__root, subscription_path, job) results.append(True) return results def UnsubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, allowed_lengths=3) resource_key,sampling_duration,sampling_interval = resource chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0) chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0) except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] subscription_node = get_subnode(resolver, self.__root, subscription_path) # if not found, resource_node is None if subscription_node is None: results.append(False) continue job : Job = getattr(subscription_node, 'value', None) if job is None or not isinstance(job, Job): raise Exception('Malformed subscription node or wrong resource key: {}'.format(str(resource))) job.remove() parent = subscription_node.parent children = list(parent.children) children.remove(subscription_node) parent.children = tuple(children) results.append(True) return results def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: while not self.__terminate.is_set(): try: sample = self.__out_samples.get(block=blocking, timeout=0.1) except queue.Empty: if blocking: continue return if sample is None: continue yield sample