Commit 00edf781 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT Connector Component:

- Corrected naming of auto-generated entities
- Added missing controls and conditions
- Cosmetic improvements
parent 0d7152ad
Loading
Loading
Loading
Loading
+54 −28
Original line number Diff line number Diff line
@@ -66,9 +66,15 @@ class DltEventDispatcher(threading.Thread):

    def run(self) -> None:
        clients = Clients()
        create_context(clients.context_client, DEFAULT_CONTEXT_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME)
        create_context(
            clients.context_client, DEFAULT_CONTEXT_NAME,
            name=DEFAULT_CONTEXT_NAME)
        create_topology(
            clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME,
            name=DEFAULT_TOPOLOGY_NAME)
        create_topology(
            clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME,
            name=INTERDOMAIN_TOPOLOGY_NAME)

        dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
        dlt_events_collector.start()
@@ -77,37 +83,42 @@ class DltEventDispatcher(threading.Thread):
            event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
            if event is None: continue

            existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
            local_domain_uuids = {
                topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
            }
            local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME)
            local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME)
            existing_topologies = clients.context_client.ListTopologies(ADMIN_CONTEXT_ID)
            local_domain_names = {topology.name for topology in existing_topologies.topologies}
            local_domain_names.discard(DEFAULT_TOPOLOGY_NAME)
            local_domain_names.discard(INTERDOMAIN_TOPOLOGY_NAME)

            self.dispatch_event(clients, local_domain_uuids, event)
            self.dispatch_event(clients, local_domain_names, event)

        dlt_events_collector.stop()
        clients.close()

    def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
    def dispatch_event(self, clients : Clients, local_domain_names : Set[str], event : DltRecordEvent) -> None:
        record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
        if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
            self._dispatch_device(clients, local_domain_uuids, event)
            self._dispatch_device(clients, local_domain_names, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
            self._dispatch_link(clients, local_domain_uuids, event)
            self._dispatch_link(clients, local_domain_names, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
            self._dispatch_slice(clients, local_domain_uuids, event)
            self._dispatch_slice(clients, local_domain_names, event)
        else:
            raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))

    def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
    def _dispatch_device(self, clients : Clients, local_domain_names : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
        # TODO: to be checked!
        if domain_uuid in {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}:
            MSG = '[_dispatch_device] Ignoring DLT event received (discard domain): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        # TODO: to be checked!
        if domain_uuid in local_domain_names:
            MSG = '[_dispatch_device] Ignoring DLT event received (local_domain_names={:s}): {:s}'
            LOGGER.info(MSG.format(str(local_domain_names), grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_device] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

@@ -117,24 +128,36 @@ class DltEventDispatcher(threading.Thread):
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
            LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))

            # TODO: to be checked!
            device = Device(**json.loads(record.data_json))
            device_name = None if len(device.name) == 0 else device.name
            create_context(clients.context_client, domain_uuid, name=device_name)
            create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME)
            create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME, name=DEFAULT_TOPOLOGY_NAME)
            clients.context_client.SetDevice(device)

            # TODO: to be checked!
            device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
            add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid)

            # TODO: to be checked!
            domain_context_id = ContextId(**json_context_id(domain_uuid))
            add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid)
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Device')

    def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
    def _dispatch_link(self, clients : Clients, local_domain_names : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
        # TODO: to be checked!
        #if domain_uuid in {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}:
        #    MSG = '[_dispatch_link] Ignoring DLT event received (discard domain): {:s}'
        #    LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
        #    return

        # TODO: to be checked!
        if domain_uuid in local_domain_names:
            MSG = '[_dispatch_link] Ignoring DLT event received (local_domain_names={:s}): {:s}'
            LOGGER.info(MSG.format(str(local_domain_names), grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_link] DLT event received (remote): {:s}'
@@ -148,12 +171,14 @@ class DltEventDispatcher(threading.Thread):

            link = Link(**json.loads(record.data_json))
            clients.context_client.SetLink(link)
            
            # TODO: to be checked!
            link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
            add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid)
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Link')

    def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
    def _dispatch_slice(self, clients : Clients, local_domain_names : Set[str], event : DltRecordEvent) -> None:
        event_type  : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        domain_uuid : str = event.record_id.domain_uuid.uuid

@@ -165,10 +190,12 @@ class DltEventDispatcher(threading.Thread):

        context_uuid = slice_.slice_id.context_id.context_uuid.uuid
        owner_uuid = slice_.slice_owner.owner_uuid.uuid
        create_context(clients.context_client, context_uuid)
        create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME)

        if domain_uuid in local_domain_uuids:
        # TODO: to be checked!
        create_context(clients.context_client, context_uuid, name=context_uuid)
        create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME, name=DEFAULT_TOPOLOGY_NAME)

        if domain_uuid in local_domain_names:
            # it is for "me"
            if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
                try:
@@ -191,7 +218,7 @@ class DltEventDispatcher(threading.Thread):

            elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
                raise NotImplementedError('Delete Slice')
        elif owner_uuid in local_domain_uuids:
        elif owner_uuid in local_domain_names:
            # it is owned by me
            # just update it locally
            LOGGER.info('[_dispatch_slice] updating locally')
@@ -207,4 +234,3 @@ class DltEventDispatcher(threading.Thread):
        else:
            MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))