Skip to content
EmulatedDriver.py 5.87 KiB
Newer Older
import anytree, logging, threading
from typing import Any, Iterator, List, Tuple, Union
from common.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.drivers.emulated.tools.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value

LOGGER = logging.getLogger(__name__)

class EmulatedDriver(_Driver):
    def __init__(self, address : str, port : int, **kwargs) -> None:
        self.__lock = threading.Lock()
        self.__root = TreeNode('.')
        self.__subscriptions = {} # resource_key => (duration, sampling_rate, start_timestamp, end_timestamp)

    def Connect(self) -> bool:
        return True

    def Disconnect(self) -> bool:
        return True

    def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]:
        chk_type('resources', resource_keys, list)
        with self.__lock:
            if len(resource_keys) == 0: return dump_subtree(self.__root)
            results = []
            resolver = anytree.Resolver(pathattr='name')
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                results.append(None if resource_node is None else dump_subtree(resource_node))
            return results

    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=2)
                    resource_key,resource_value = resource
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                results.append(True)
        return results

    def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
        chk_type('resources', resource_keys, list)
        if len(resource_keys) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                if resource_node is None:
                    results.append(False)
                else:
                    parent = resource_node.parent
                    children = list(parent.children)
                    children.remove(resource_node)
                    parent.children = tuple(children)
                    results.append(True)
        return results

    def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=3)
                    resource_key,sampling_duration,sampling_rate = resource
                    chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_resource_name + '.sampling_rate', sampling_rate, min_value=0)
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                results.append(True)
        return results


    def UnsubscribeState(self, resources : List[str]) -> List[bool]:
        raise NotImplementedError()

    def GetState(self) -> Iterator[Tuple[str, Any]]:
        raise NotImplementedError()