Commit 4ceab714 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT Connector:

- Added dependencies in dockerfile
- added timestamp to log format
- implemented RecordSlice method
- improved structure of DltEventDispatcher
- implemented dispatching of Slice events
parent 8e70d38d
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -64,6 +64,8 @@ RUN python3 -m pip install -r requirements.txt
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/dlt/connector/. dlt/connector
COPY src/interdomain/. interdomain/
COPY src/slice/. slice/

# Start the service
ENTRYPOINT ["python", "-m", "dlt.connector.service"]
+28 −0
Original line number Diff line number Diff line
@@ -130,4 +130,32 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty:
        context_client = ContextClient()
        slice_ = context_client.GetSlice(request.slice_id)

        dltgateway_client = DltGatewayClient()

        dlt_record_id = DltRecordId()
        dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid
        dlt_record_id.type             = DltRecordTypeEnum.DLTRECORDTYPE_SLICE
        dlt_record_id.record_uuid.uuid = slice_.slice_id.slice_uuid.uuid

        LOGGER.info('[RecordSlice] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id)))
        dlt_record = dltgateway_client.GetFromDlt(dlt_record_id)
        LOGGER.info('[RecordSlice] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))

        exists = record_exists(dlt_record)
        LOGGER.info('[RecordSlice] exists = {:s}'.format(str(exists)))

        dlt_record = DltRecord()
        dlt_record.record_id.CopyFrom(dlt_record_id)
        dlt_record.operation = \
            DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \
            if exists else \
            DltRecordOperationEnum.DLTRECORDOPERATION_ADD

        dlt_record.data_json = grpc_message_to_json_string(slice_)
        LOGGER.info('[RecordSlice] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))
        dlt_record_status = dltgateway_client.RecordToDlt(dlt_record)
        LOGGER.info('[RecordSlice] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status)))
        return Empty()
+1 −1
Original line number Diff line number Diff line
@@ -32,7 +32,7 @@ def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level)
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
+136 −43
Original line number Diff line number Diff line
@@ -12,20 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging, threading
from typing import Any
import grpc, json, logging, threading
from typing import Any, Dict, Set
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link
from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum
from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltSliceId
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import add_device_to_topology
from common.tools.context_queries.Link import add_link_to_topology
from common.tools.context_queries.Topology import create_topology
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from interdomain.client.InterdomainClient import InterdomainClient

LOGGER = logging.getLogger(__name__)

@@ -33,6 +37,19 @@ GET_EVENT_TIMEOUT = 0.5

ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID))

class Clients:
    def __init__(self) -> None:
        self.context_client = ContextClient()
        self.dlt_connector_client = DltConnectorClient()
        self.dlt_gateway_client = DltGatewayClient()
        self.interdomain_client = InterdomainClient()

    def close(self) -> None:
        self.interdomain_client.close()
        self.dlt_gateway_client.close()
        self.dlt_connector_client.close()
        self.context_client.close()

class DltEventDispatcher(threading.Thread):
    def __init__(self) -> None:
        LOGGER.debug('Creating connector...')
@@ -48,61 +65,137 @@ class DltEventDispatcher(threading.Thread):
        self._terminate.set()

    def run(self) -> None:
        context_client = ContextClient()
        create_context(context_client, DEFAULT_CONTEXT_UUID)
        create_topology(context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID)
        create_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID)
        clients = Clients()
        create_context(clients.context_client, DEFAULT_CONTEXT_UUID)
        create_topology(clients.context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID)
        create_topology(clients.context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID)

        dlt_gateway_client = DltGatewayClient()
        dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True)
        dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True)
        dlt_events_collector.start()

        while not self._terminate.is_set():
            event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
            if event is None: continue

            existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)))
            existing_topology_uuids = {
            existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID)
            local_domain_uuids = {
                topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
            }
            existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID)
            existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID)
            local_domain_uuids.discard(DEFAULT_TOPOLOGY_UUID)
            local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID)

            if event.record_id.domain_uuid.uuid in existing_topology_uuids:
                LOGGER.info('Ignoring DLT event received (local): {:s}'.format(grpc_message_to_json_string(event)))
            else:
                LOGGER.info('DLT event received (remote): {:s}'.format(grpc_message_to_json_string(event)))
                self.dispatch_event(context_client, dlt_gateway_client, event)
            self.dispatch_event(clients, local_domain_uuids, event)

        dlt_events_collector.stop()
        dlt_gateway_client.close()
        context_client.close()
        clients.close()

    def dispatch_event(
        self, context_client : ContextClient, dlt_gateway_client : DltGatewayClient, event : Any
    ) -> None:
        event_type  : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
    def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE}
        if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
            self._dispatch_device(clients, local_domain_uuids, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:
            self._dispatch_link(clients, local_domain_uuids, event)
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE:
            self._dispatch_slice(clients, local_domain_uuids, event)
        else:
            raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))

        LOGGER.info('[dispatch_event] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
        record = dlt_gateway_client.GetFromDlt(event.record_id)
        LOGGER.info('[dispatch_event] record={:s}'.format(grpc_message_to_json_string(record)))
    def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE:
        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_device] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
            LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record)))

            create_context(clients.context_client, domain_uuid)
            create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID)
            device = Device(**json.loads(record.data_json))
                context_client.SetDevice(device)
            clients.context_client.SetDevice(device)
            device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member
                add_device_to_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid)
            add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid)
            domain_context_id = ContextId(**json_context_id(domain_uuid))
            add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_UUID, device_uuid)
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Device')
        elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK:

    def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        domain_uuid : str = event.record_id.domain_uuid.uuid

        if domain_uuid in local_domain_uuids:
            MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
            return

        MSG = '[_dispatch_link] DLT event received (remote): {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(event)))

        event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
            LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
            record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
            LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record)))

            link = Link(**json.loads(record.data_json))
                context_client.SetLink(link)
            clients.context_client.SetLink(link)
            link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member
                add_link_to_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid)
            add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid)
        elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
            raise NotImplementedError('Delete Link')

    def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None:
        event_type  : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE}
        domain_uuid : str = event.record_id.domain_uuid.uuid

        LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id)))
        record = clients.dlt_gateway_client.GetFromDlt(event.record_id)
        LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record)))

        slice_ = Slice(**json.loads(record.data_json))

        context_uuid = slice_.slice_id.context_id.context_uuid.uuid
        owner_uuid = slice_.slice_owner.owner_uuid.uuid
        create_context(clients.context_client, context_uuid)
        create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_UUID)

        if domain_uuid in local_domain_uuids:
            # it is for "me"
            if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}:
                try:
                    db_slice = clients.context_client.GetSlice(slice_.slice_id)
                    # exists
                    db_json_slice = grpc_message_to_json_string(db_slice)
                except grpc.RpcError:
                    # not exists
                    db_json_slice = None

                _json_slice = grpc_message_to_json_string(slice_)
                if db_json_slice != _json_slice:
                    # not exists or is different...
                    slice_id = clients.interdomain_client.RequestSlice(slice_)
                    topology_id = TopologyId(**json_topology_id(domain_uuid))
                    dlt_slice_id = DltSliceId()
                    dlt_slice_id.topology_id.CopyFrom(topology_id)  # pylint: disable=no-member
                    dlt_slice_id.slice_id.CopyFrom(slice_id)        # pylint: disable=no-member
                    clients.dlt_connector_client.RecordSlice(dlt_slice_id)

            elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}:
                raise NotImplementedError('Delete Slice')
        elif owner_uuid in local_domain_uuids:
            # it is owned by me
            # just update it locally
            LOGGER.info('[_dispatch_slice] updating locally')
            clients.context_client.SetSlice(slice_)
        else:
            raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))
            MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(event)))