Newer
Older
from __future__ import annotations
from typing import TYPE_CHECKING, Dict
from ..entity._Entity import _Entity
from ..entity.EntityAttributes import EntityAttributes
from ..entity.EntityCollection import EntityCollection

Lluis Gifre Renom
committed
from .Endpoint import Endpoint
from .Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS

Lluis Gifre Renom
committed
from .OperationalStatus import OperationalStatus, to_operationalstatus_enum
if TYPE_CHECKING:
from .Context import Context
from .Topology import Topology

Lluis Gifre Renom
committed
VALIDATORS = {
'device_type': lambda v: v is None or isinstance(v, str),
'device_config': lambda v: v is None or isinstance(v, str),
'device_operational_status': lambda v: v is None or isinstance(v, OperationalStatus),
}
TRANSCODERS = {
'device_operational_status': {
OperationalStatus: lambda v: v.value,

Lluis Gifre Renom
committed
str : lambda v: to_operationalstatus_enum(v),
}
}
class Device(_Entity):
def __init__(self, device_uuid : str, parent : 'Topology'):
self._attributes = EntityAttributes(self, KEY_DEVICE, VALIDATORS, transcoders=TRANSCODERS)
self._endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)
@property
def parent(self) -> 'Topology': return self._parent
@property
def context(self) -> 'Context': return self.parent.context
def context_uuid(self) -> str: return self.parent.context_uuid
@property
def topology(self) -> 'Topology': return self.parent
@property
def topology_uuid(self) -> str: return self.parent.topology_uuid
@property
def device_uuid(self) -> str: return self._entity_uuid
@property
def attributes(self) -> EntityAttributes: return self._attributes
@property
def endpoints(self) -> EntityCollection: return self._endpoints

Lluis Gifre Renom
committed
def endpoint(self, endpoint_uuid : str) -> Endpoint: return Endpoint(endpoint_uuid, self)

Lluis Gifre Renom
committed
def create(self, device_type : str, device_config : str, device_operational_status : OperationalStatus) -> 'Device':

Lluis Gifre Renom
committed
self.update(update_attributes={
'device_type': device_type,
'device_config': device_config,
'device_operational_status': device_operational_status,

Lluis Gifre Renom
committed
})

Lluis Gifre Renom
committed
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
for endpoint_uuid in self.endpoints.get(): self.endpoint(endpoint_uuid).delete()

Lluis Gifre Renom
committed
def dump_id(self) -> Dict:
return {
'device_id': {'uuid': self.device_uuid},
}

Lluis Gifre Renom
committed
def dump(self) -> Dict:
attributes = self.attributes.get()
dev_op_status = attributes.get('device_operational_status', None)
if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
return {

Lluis Gifre Renom
committed
'device_type': attributes.get('device_type', None),
'device_config': {'device_config': attributes.get('device_config', None)},
'devOperationalStatus': dev_op_status,
'endpointList': endpoints
}