Newer
Older

Lluis Gifre Renom
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .tools.EntityCollection import EntityCollection
from .Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS
from .Endpoint import Endpoint
from .OperationalStatus import OperationalStatus, to_operationalstatus_enum
VALIDATORS = {
'device_type': lambda v: v is None or isinstance(v, str),
'device_config': lambda v: v is None or isinstance(v, str),
'device_operational_status': lambda v: v is None or isinstance(v, OperationalStatus),
}
TRANSCODERS = {
'device_operational_status': {
OperationalStatus: lambda v: v.value,
str : lambda v: to_operationalstatus_enum(v),
}
}
class Device(_Entity):
def __init__(self, device_uuid : str, parent : 'Topology'): # type: ignore
super().__init__(device_uuid, parent=parent)
self.device_uuid = self.get_uuid()
self.topology_uuid = self._parent.get_uuid()
self.context_uuid = self._parent._parent.get_uuid()
self.attributes = EntityAttributes(self, KEY_DEVICE, VALIDATORS, transcoders=TRANSCODERS)
self.endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)
def endpoint(self, endpoint_uuid : str) -> Endpoint:
return Endpoint(endpoint_uuid, self)
def create(self, type : str, config : str, operational_status : OperationalStatus) -> 'Device':
self.update(update_attributes={
'device_type': type,
'device_config': config,
'device_operational_status': operational_status,
})
self._parent.devices.add(self.get_uuid())
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
remove_attributes = ['device_type', 'device_config', 'device_operational_status']
self.update(remove_attributes=remove_attributes)
self._parent.devices.delete(self.get_uuid())
def dump(self) -> Dict:
attributes = self.attributes.get()
dev_op_status = attributes.get('device_operational_status', None)
if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
return {
'device_id': {'device_id': {'uuid': self.device_uuid}},
'device_type': attributes.get('device_type', None),
'device_config': {'device_config': attributes.get('device_config', None)},
'devOperationalStatus': dev_op_status,
'endpointList': endpoints
}