Skip to content
DltEventDispatcher.py 10.6 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, threading
from typing import Any, Dict, Set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltSliceId
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import add_device_to_topology
from common.tools.context_queries.Link import add_link_to_topology
from common.tools.context_queries.Topology import create_topology
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Topology import json_topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from dlt.connector.client.DltConnectorClient import DltConnectorClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from interdomain.client.InterdomainClient import InterdomainClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

GET_EVENT_TIMEOUT = 0.5

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class Clients:
    def __init__(self) -> None:
        self.context_client = ContextClient()
        self.dlt_connector_client = DltConnectorClient()
        self.dlt_gateway_client = DltGatewayClient()
        self.interdomain_client = InterdomainClient()

    def close(self) -> None:
        self.interdomain_client.close()
        self.dlt_gateway_client.close()
        self.dlt_connector_client.close()
        self.context_client.close()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class DltEventDispatcher(threading.Thread):
    def __init__(self) -> None:
        LOGGER.debug('Creating connector...')
        super().__init__(name='DltEventDispatcher', daemon=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._terminate = threading.Event()
        LOGGER.debug('Connector created')

    def start(self) -> None:
        self._terminate.clear()
        return super().start()

    def stop(self):
        self._terminate.set()

    def run(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        clients = Clients()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        create_context(clients.context_client, DEFAULT_CONTEXT_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME)
        create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        dlt_events_collector.start()

        while not self._terminate.is_set():
            event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if event is None: continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
            local_domain_uuids = {
                topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
            }
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME)
            local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.dispatch_event(clients, local_domain_uuids, event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        dlt_events_collector.stop()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        clients.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
        if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self._dispatch_device(clients, local_domain_uuids, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self._dispatch_link(clients, local_domain_uuids, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
            self._dispatch_slice(clients, local_domain_uuids, event)
        else:
            raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))

    def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_device] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
            LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))

            create_context(clients.context_client, domain_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = Device(**json.loads(record.data_json))
            clients.context_client.SetDevice(device)
            device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            domain_context_id = ContextId(**json_context_id(domain_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Device')

    def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_link] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
            LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record)))

            link = Link(**json.loads(record.data_json))
            clients.context_client.SetLink(link)
            link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Link')

    def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        event_type  : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        domain_uuid : str = event.record_id.domain_uuid.uuid

        LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
        record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
        LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record)))

        slice_ = Slice(**json.loads(record.data_json))

        context_uuid = slice_.slice_id.context_id.context_uuid.uuid
        owner_uuid = slice_.slice_owner.owner_uuid.uuid
        create_context(clients.context_client, context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        if domain_uuid in local_domain_uuids:
            # it is for "me"
            if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                try:
                    db_slice = clients.context_client.GetSlice(slice_.slice_id)
                    # exists
                    db_json_slice = grpc_message_to_json_string(db_slice)
                except grpc.RpcError:
                    # not exists
                    db_json_slice = None

                _json_slice = grpc_message_to_json_string(slice_)
                if db_json_slice != _json_slice:
                    # not exists or is different...
                    slice_id = clients.interdomain_client.RequestSlice(slice_)
                    topology_id = TopologyId(**json_topology_id(domain_uuid))
                    dlt_slice_id = DltSliceId()
                    dlt_slice_id.topology_id.CopyFrom(topology_id)  # pylint: disable=no-member
                    dlt_slice_id.slice_id.CopyFrom(slice_id)        # pylint: disable=no-member
                    clients.dlt_connector_client.RecordSlice(dlt_slice_id)

            elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise NotImplementedError('Delete Slice')
        elif owner_uuid in local_domain_uuids:
            # it is owned by me
            # just update it locally
            LOGGER.info('[_dispatch_slice] updating locally')

            local_slice = Slice()
            local_slice.CopyFrom(slice_)

            # pylint: disable=no-member
            del local_slice.slice_service_ids[:]    # they are from remote domains so will not be present locally
            del local_slice.slice_subslice_ids[:]   # they are from remote domains so will not be present locally

            clients.context_client.SetSlice(local_slice)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))