Skip to content
Snippets Groups Projects
Endpoint.py 2.47 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from __future__ import annotations
from typing import TYPE_CHECKING, Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.database.api.context.topology.device.Endpoint import Endpoint as DeviceEndpoint
from common.database.api.context.Keys import KEY_SERVICE_ENDPOINT
from common.database.api.entity._Entity import _Entity
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

if TYPE_CHECKING:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    from common.database.api.context.Context import Context
    from common.database.api.context.service.Service import Service
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

VALIDATORS = {
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    'topology_uuid': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    'device_uuid': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
    'endpoint_uuid': lambda v: v is not None and isinstance(v, str) and (len(v) > 0),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
TRANSCODERS = {} # no transcoding applied to attributes

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class Endpoint(_Entity):
    def __init__(self, endpoint_uuid : str, parent : 'Service'):
        super().__init__(parent, endpoint_uuid, KEY_SERVICE_ENDPOINT, VALIDATORS, TRANSCODERS)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def parent(self) -> 'Service': return self._parent
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def context(self) -> 'Context': return self.parent.context
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def context_uuid(self) -> str: return self.parent.context_uuid

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def service(self) -> 'Service': return self.parent
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def service_uuid(self) -> str: return self.parent.service_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @property
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def endpoint_uuid(self) -> str: return self._entity_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def create(self, endpoint : DeviceEndpoint) -> 'Endpoint':
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.update(update_attributes={
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'topology_uuid': endpoint.topology_uuid,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'device_uuid': endpoint.device_uuid,
            'endpoint_uuid': endpoint.endpoint_uuid,
        })
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.parent.endpoints.add(self.endpoint_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return self

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def update(self, update_attributes={}, remove_attributes=[]) -> 'Endpoint':
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
        return self

    def delete(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.attributes.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.parent.endpoints.delete(self.endpoint_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dump_id(self) -> Dict:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        attributes = self.attributes.get()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topology_uuid = attributes.get('topology_uuid', None)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = attributes.get('device_uuid', None)
        endpoint_uuid = attributes.get('endpoint_uuid', None)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        endpoint = self.context.topology(topology_uuid).device(device_uuid).endpoint(endpoint_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return endpoint.dump_id()

    def dump(self) -> Dict:
        return self.dump_id()