Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Device.py 3.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from __future__ import annotations
    from typing import TYPE_CHECKING, Dict
    from ..entity._Entity import _Entity
    from ..entity.EntityCollection import EntityCollection
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS
    
    from .OperationalStatus import OperationalStatus, to_operationalstatus_enum
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    if TYPE_CHECKING:
        from .Context import Context
        from .Topology import Topology
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        'device_type': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
        'device_config': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
        'device_operational_status': lambda v: v is not None and isinstance(v, OperationalStatus),
    
    }
    
    TRANSCODERS = {
        'device_operational_status': {
            OperationalStatus: lambda v: v.value,
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            int              : lambda v: to_operationalstatus_enum(v),
    
            str              : lambda v: to_operationalstatus_enum(v),
        }
    }
    
    class Device(_Entity):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, device_uuid : str, parent : 'Topology'):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            super().__init__(parent, device_uuid, KEY_DEVICE, VALIDATORS, TRANSCODERS)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)
    
        @property
        def parent(self) -> 'Topology': return self._parent
    
        @property
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def context(self) -> 'Context': return self.parent.context
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        @property
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def context_uuid(self) -> str: return self.parent.context_uuid
    
        @property
        def topology(self) -> 'Topology': return self.parent
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        @property
        def topology_uuid(self) -> str: return self.parent.topology_uuid
    
        @property
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def device_uuid(self) -> str: return self._entity_uuid
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        @property
        def endpoints(self) -> EntityCollection: return self._endpoints
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def endpoint(self, endpoint_uuid : str) -> Endpoint: return Endpoint(endpoint_uuid, self)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def create(self, device_type : str, device_config : str, device_operational_status : OperationalStatus) -> 'Device':
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                'device_type': device_type,
                'device_config': device_config,
                'device_operational_status': device_operational_status,
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self.parent.devices.add(self.device_uuid)
    
            return self
    
        def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
            self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
            return self
    
        def delete(self) -> None:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for endpoint_uuid in self.endpoints.get(): self.endpoint(endpoint_uuid).delete()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self.attributes.delete()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self.parent.devices.delete(self.device_uuid)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dump_id(self) -> Dict:
            return {
                'device_id': {'uuid': self.device_uuid},
            }
    
    
        def dump(self) -> Dict:
            attributes = self.attributes.get()
            dev_op_status = attributes.get('device_operational_status', None)
            if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
            endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
            return {
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                'device_id': self.dump_id(),
    
                'device_type': attributes.get('device_type', None),
                'device_config': {'device_config': attributes.get('device_config', None)},
                'devOperationalStatus': dev_op_status,
                'endpointList': endpoints
            }