Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import anytree, logging, threading
from typing import Any, Iterator, List, Tuple, Union
from common.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.drivers.emulated.tools.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
LOGGER = logging.getLogger(__name__)
class EmulatedDriver(_Driver):
def __init__(self, address : str, port : int, **kwargs) -> None:
self.__lock = threading.Lock()
self.__root = TreeNode('.')
self.__subscriptions = {} # resource_key => (duration, sampling_rate, start_timestamp, end_timestamp)
def Connect(self) -> bool:
return True
def Disconnect(self) -> bool:
return True
def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]:
chk_type('resources', resource_keys, list)
with self.__lock:
if len(resource_keys) == 0: return dump_subtree(self.__root)
results = []
resolver = anytree.Resolver(pathattr='name')
for i,resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_path = resource_key.split('/')
except Exception as e:
LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
# if not found, resource_node is None
results.append(None if resource_node is None else dump_subtree(resource_node))
return results
def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
try:
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, allowed_lengths=2)
resource_key,resource_value = resource
resource_path = resource_key.split('/')
except Exception as e:
LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
set_subnode_value(resolver, self.__root, resource_path, resource_value)
results.append(True)
return results
def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
chk_type('resources', resource_keys, list)
if len(resource_keys) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_path = resource_key.split('/')
except Exception as e:
LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
# if not found, resource_node is None
if resource_node is None:
results.append(False)
else:
parent = resource_node.parent
children = list(parent.children)
children.remove(resource_node)
parent.children = tuple(children)
results.append(True)
return results
def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
try:
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, allowed_lengths=3)
resource_key,sampling_duration,sampling_rate = resource
chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
resource_path = resource_key.split('/')
chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
chk_float(str_resource_name + '.sampling_rate', sampling_rate, min_value=0)
except Exception as e:
LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
set_subnode_value(resolver, self.__root, resource_path, resource_value)
results.append(True)
return results
def UnsubscribeState(self, resources : List[str]) -> List[bool]:
raise NotImplementedError()
def GetState(self) -> Iterator[Tuple[str, Any]]:
raise NotImplementedError()