Skip to content
Snippets Groups Projects
Device.py 3.51 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from __future__ import annotations
from typing import TYPE_CHECKING, Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.database.api.context.Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus, to_operationalstatus_enum
from common.database.api.entity._Entity import _Entity
from common.database.api.entity.EntityCollection import EntityCollection
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
if TYPE_CHECKING:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    from common.database.api.context.Context import Context
    from common.database.api.context.topology.Topology import Topology
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    'device_type': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
    'device_config': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
    'device_operational_status': lambda v: v is not None and isinstance(v, OperationalStatus),
}

TRANSCODERS = {
    'device_operational_status': {
        OperationalStatus: lambda v: v.value,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        int              : lambda v: to_operationalstatus_enum(v),
        str              : lambda v: to_operationalstatus_enum(v),
    }
}

class Device(_Entity):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, device_uuid : str, parent : 'Topology'):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        super().__init__(parent, device_uuid, KEY_DEVICE, VALIDATORS, TRANSCODERS)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)

    @property
    def parent(self) -> 'Topology': return self._parent

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def context(self) -> 'Context': return self.parent.context
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def context_uuid(self) -> str: return self.parent.context_uuid

    @property
    def topology(self) -> 'Topology': return self.parent
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
    def topology_uuid(self) -> str: return self.parent.topology_uuid

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def device_uuid(self) -> str: return self._entity_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
    def endpoints(self) -> EntityCollection: return self._endpoints
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def endpoint(self, endpoint_uuid : str) -> Endpoint: return Endpoint(endpoint_uuid, self)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def create(self, device_type : str, device_config : str, device_operational_status : OperationalStatus) -> 'Device':
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'device_type': device_type,
            'device_config': device_config,
            'device_operational_status': device_operational_status,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.parent.devices.add(self.device_uuid)
        return self

    def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
        self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
        return self

    def delete(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for endpoint_uuid in self.endpoints.get(): self.endpoint(endpoint_uuid).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.attributes.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.parent.devices.delete(self.device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dump_id(self) -> Dict:
        return {
            'device_id': {'uuid': self.device_uuid},
        }

    def dump(self) -> Dict:
        attributes = self.attributes.get()
        dev_op_status = attributes.get('device_operational_status', None)
        if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
        endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
        return {
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'device_id': self.dump_id(),
            'device_type': attributes.get('device_type', None),
            'device_config': {'device_config': attributes.get('device_config', None)},
            'devOperationalStatus': dev_op_status,
            'endpointList': endpoints
        }