import anytree, logging, threading from typing import Any, Iterator, List, Tuple, Union from common.Checkers import chk_float, chk_length, chk_string, chk_type from device.service.driver_api._Driver import _Driver from device.service.drivers.emulated.tools.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) class EmulatedDriver(_Driver): def __init__(self, address : str, port : int, **kwargs) -> None: self.__lock = threading.Lock() self.__root = TreeNode('.') self.__subscriptions = {} # resource_key => (duration, sampling_rate, start_timestamp, end_timestamp) def Connect(self) -> bool: return True def Disconnect(self) -> bool: return True def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]: chk_type('resources', resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__root) results = [] resolver = anytree.Resolver(pathattr='name') for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__root, resource_path, default=None) # if not found, resource_node is None results.append(None if resource_node is None else dump_subtree(resource_node)) return results def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, allowed_lengths=2) resource_key,resource_value = resource resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue set_subnode_value(resolver, self.__root, resource_path, resource_value) results.append(True) return results def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]: chk_type('resources', resource_keys, list) if len(resource_keys) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__root, resource_path, default=None) # if not found, resource_node is None if resource_node is None: results.append(False) else: parent = resource_node.parent children = list(parent.children) children.remove(resource_node) parent.children = tuple(children) results.append(True) return results def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, allowed_lengths=3) resource_key,sampling_duration,sampling_rate = resource chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0) chk_float(str_resource_name + '.sampling_rate', sampling_rate, min_value=0) except Exception as e: LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue set_subnode_value(resolver, self.__root, resource_path, resource_value) results.append(True) return results def UnsubscribeState(self, resources : List[str]) -> List[bool]: raise NotImplementedError() def GetState(self) -> Iterator[Tuple[str, Any]]: raise NotImplementedError()