# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging, threading from typing import Any from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient from dlt.connector.client.DltEventsCollector import DltEventsCollector from dlt.connector.client.DltGatewayClient import DltGatewayClient from .Tools import add_device_to_topology, add_link_to_topology, create_context, create_topology LOGGER = logging.getLogger(__name__) GET_EVENT_TIMEOUT = 0.5 class DltEventDispatcher(threading.Thread): def __init__(self) -> None: LOGGER.debug('Creating connector...') super().__init__(name='DltEventDispatcher', daemon=True) self._terminate = threading.Event() LOGGER.debug('Connector created') def start(self) -> None: self._terminate.clear() return super().start() def stop(self): self._terminate.set() def run(self) -> None: context_client = ContextClient() create_context(context_client, DEFAULT_CONTEXT_UUID) create_topology(context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID) create_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID) dlt_gateway_client = DltGatewayClient() dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True) dlt_events_collector.start() while not self._terminate.is_set(): event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID))) existing_topology_uuids = { topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids } existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID) existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID) if event.record_id.domain_uuid.uuid in existing_topology_uuids: LOGGER.info('Ignoring DLT event received (local): {:s}'.format(grpc_message_to_json_string(event))) else: LOGGER.info('DLT event received (remote): {:s}'.format(grpc_message_to_json_string(event))) self.dispatch_event(context_client, dlt_gateway_client, event) dlt_events_collector.stop() dlt_gateway_client.close() context_client.close() def dispatch_event( self, context_client : ContextClient, dlt_gateway_client : DltGatewayClient, event : Any ) -> None: event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} LOGGER.info('[dispatch_event] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) record = dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[dispatch_event] record={:s}'.format(grpc_message_to_json_string(record))) if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: device = Device(**json.loads(record.data_json)) context_client.SetDevice(device) device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member add_device_to_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Device') elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: link = Link(**json.loads(record.data_json)) context_client.SetLink(link) link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member add_link_to_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Link') else: raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event)))