Newer
Older
from __future__ import annotations
from typing import TYPE_CHECKING, Dict
from common.database.api.context.Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus, to_operationalstatus_enum
from common.database.api.entity._Entity import _Entity
from common.database.api.entity.EntityCollection import EntityCollection

Lluis Gifre Renom
committed
from common.database.api.context.Context import Context
from common.database.api.context.topology.Topology import Topology

Lluis Gifre Renom
committed
VALIDATORS = {
'device_type': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
'device_config': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
'device_operational_status': lambda v: v is not None and isinstance(v, OperationalStatus),

Lluis Gifre Renom
committed
}
TRANSCODERS = {
'device_operational_status': {
OperationalStatus: lambda v: v.value,

Lluis Gifre Renom
committed
str : lambda v: to_operationalstatus_enum(v),
}
}
class Device(_Entity):
def __init__(self, device_uuid : str, parent : 'Topology'):
super().__init__(parent, device_uuid, KEY_DEVICE, VALIDATORS, TRANSCODERS)
self._endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)
@property
def parent(self) -> 'Topology': return self._parent
@property
def context(self) -> 'Context': return self.parent.context
def context_uuid(self) -> str: return self.parent.context_uuid
@property
def topology(self) -> 'Topology': return self.parent
@property
def topology_uuid(self) -> str: return self.parent.topology_uuid
@property
def device_uuid(self) -> str: return self._entity_uuid
@property
def endpoints(self) -> EntityCollection: return self._endpoints

Lluis Gifre Renom
committed
def endpoint(self, endpoint_uuid : str) -> Endpoint: return Endpoint(endpoint_uuid, self)

Lluis Gifre Renom
committed
def create(self, device_type : str, device_config : str, device_operational_status : OperationalStatus) -> 'Device':

Lluis Gifre Renom
committed
self.update(update_attributes={
'device_type': device_type,
'device_config': device_config,
'device_operational_status': device_operational_status,

Lluis Gifre Renom
committed
})

Lluis Gifre Renom
committed
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
for endpoint_uuid in self.endpoints.get(): self.endpoint(endpoint_uuid).delete()

Lluis Gifre Renom
committed
def dump_id(self) -> Dict:
return {
'device_id': {'uuid': self.device_uuid},
}

Lluis Gifre Renom
committed
def dump(self) -> Dict:
attributes = self.attributes.get()
dev_op_status = attributes.get('device_operational_status', None)
if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
return {

Lluis Gifre Renom
committed
'device_type': attributes.get('device_type', None),
'device_config': {'device_config': attributes.get('device_config', None)},
'devOperationalStatus': dev_op_status,
'endpointList': endpoints
}