Skip to content
DltEventDispatcher.py 10.6 KiB
Newer Older
Javier Diaz's avatar
Javier Diaz committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, threading
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Any, Dict, Set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltSliceId
Javier Diaz's avatar
Javier Diaz committed
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import add_device_to_topology
from common.tools.context_queries.Link import add_link_to_topology
from common.tools.context_queries.Topology import create_topology
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Topology import json_topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Javier Diaz's avatar
Javier Diaz committed
from dlt.connector.client.DltConnectorClient import DltConnectorClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from dlt.connector.client.DltEventsCollector import DltEventsCollector
Javier Diaz's avatar
Javier Diaz committed
from dlt.connector.client.DltGatewayClient import DltGatewayClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from interdomain.client.InterdomainClient import InterdomainClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

GET_EVENT_TIMEOUT = 0.5

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class Clients:
    def __init__(self) -> None:
        self.context_client = ContextClient()
Javier Diaz's avatar
Javier Diaz committed
        self.dlt_connector_client = DltConnectorClient()
        self.dlt_gateway_client = DltGatewayClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.interdomain_client = InterdomainClient()

    def close(self) -> None:
        self.interdomain_client.close()
        self.dlt_gateway_client.close()
        self.dlt_connector_client.close()
        self.context_client.close()

Javier Diaz's avatar
Javier Diaz committed
class DltEventDispatcher(threading.Thread):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self) -> None:
        LOGGER.debug('Creating connector...')
Javier Diaz's avatar
Javier Diaz committed
        super().__init__(name='DltEventDispatcher', daemon=True)
        self._terminate = threading.Event()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Connector created')

Javier Diaz's avatar
Javier Diaz committed
    def start(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._terminate.clear()
Javier Diaz's avatar
Javier Diaz committed
        return super().start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Javier Diaz's avatar
Javier Diaz committed
    def stop(self):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._terminate.set()

Javier Diaz's avatar
Javier Diaz committed
    def run(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        clients = Clients()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        create_context(clients.context_client, DEFAULT_CONTEXT_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
Javier Diaz's avatar
Javier Diaz committed
        dlt_events_collector.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        while not self._terminate.is_set():
Javier Diaz's avatar
Javier Diaz committed
            event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if event is None: continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
            local_domain_uuids = {
                topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
            }
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME)
            local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME)
Javier Diaz's avatar
Javier Diaz committed
            self.dispatch_event(clients, local_domain_uuids, event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Javier Diaz's avatar
Javier Diaz committed
        dlt_events_collector.stop()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        clients.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Javier Diaz's avatar
Javier Diaz committed
    def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
        if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
Javier Diaz's avatar
Javier Diaz committed
            self._dispatch_device(clients, local_domain_uuids, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
Javier Diaz's avatar
Javier Diaz committed
            self._dispatch_link(clients, local_domain_uuids, event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
Javier Diaz's avatar
Javier Diaz committed
            self._dispatch_slice(clients, local_domain_uuids, event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        else:
            raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))

Javier Diaz's avatar
Javier Diaz committed
    def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_device] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

Javier Diaz's avatar
Javier Diaz committed
        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
Javier Diaz's avatar
Javier Diaz committed
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))

            create_context(clients.context_client, domain_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = Device(**json.loads(record.data_json))
            clients.context_client.SetDevice(device)
Javier Diaz's avatar
Javier Diaz committed
            device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            domain_context_id = ContextId(**json_context_id(domain_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Device')

Javier Diaz's avatar
Javier Diaz committed
    def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_link] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

Javier Diaz's avatar
Javier Diaz committed
        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
Javier Diaz's avatar
Javier Diaz committed
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record)))

            link = Link(**json.loads(record.data_json))
            clients.context_client.SetLink(link)
Javier Diaz's avatar
Javier Diaz committed
            link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Link')

Javier Diaz's avatar
Javier Diaz committed
    def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        event_type  : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        domain_uuid : str = event.record_id.domain_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
Javier Diaz's avatar
Javier Diaz committed
        record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record)))

        slice_ = Slice(**json.loads(record.data_json))

        context_uuid = slice_.slice_id.context_id.context_uuid.uuid
        owner_uuid = slice_.slice_owner.owner_uuid.uuid
        create_context(clients.context_client, context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        if domain_uuid in local_domain_uuids:
            # it is for "me"
            if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                try:
                    db_slice = clients.context_client.GetSlice(slice_.slice_id)
                    # exists
                    db_json_slice = grpc_message_to_json_string(db_slice)
                except grpc.RpcError:
                    # not exists
                    db_json_slice = None

                _json_slice = grpc_message_to_json_string(slice_)
                if db_json_slice != _json_slice:
                    # not exists or is different...
                    slice_id = clients.interdomain_client.RequestSlice(slice_)
                    topology_id = TopologyId(**json_topology_id(domain_uuid))
                    dlt_slice_id = DltSliceId()
                    dlt_slice_id.topology_id.CopyFrom(topology_id)  # pylint: disable=no-member
Javier Diaz's avatar
Javier Diaz committed
                    dlt_slice_id.slice_id.CopyFrom(slice_id)        # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    clients.dlt_connector_client.RecordSlice(dlt_slice_id)

            elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise NotImplementedError('Delete Slice')
        elif owner_uuid in local_domain_uuids:
            # it is owned by me
            # just update it locally
            LOGGER.info('[_dispatch_slice] updating locally')

            local_slice = Slice()
            local_slice.CopyFrom(slice_)

            # pylint: disable=no-member
Javier Diaz's avatar
Javier Diaz committed
            del local_slice.slice_service_ids[:]    # they are from remote domains so will not be present locally
            del local_slice.slice_subslice_ids[:]   # they are from remote domains so will not be present locally

            clients.context_client.SetSlice(local_slice)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
Javier Diaz's avatar
Javier Diaz committed