diff --git a/src/dlt/connector/Dockerfile b/src/dlt/connector/Dockerfile index 51e9ec506f0c8a6c35ceac68833e3ad683ef8e63..c5d600ee0d55deb5a8bd4dca2d4f12cd092ad420 100644 --- a/src/dlt/connector/Dockerfile +++ b/src/dlt/connector/Dockerfile @@ -64,6 +64,8 @@ RUN python3 -m pip install -r requirements.txt WORKDIR /var/teraflow COPY src/context/. context/ COPY src/dlt/connector/. dlt/connector +COPY src/interdomain/. interdomain/ +COPY src/slice/. slice/ # Start the service ENTRYPOINT ["python", "-m", "dlt.connector.service"] diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index e3ba1f68817f0795008ff2e0fe554c71aad3d32e..6c5401cb1724f8a759001d790e835ab78ce4c6c6 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -130,4 +130,32 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: + context_client = ContextClient() + slice_ = context_client.GetSlice(request.slice_id) + + dltgateway_client = DltGatewayClient() + + dlt_record_id = DltRecordId() + dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid + dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_SLICE + dlt_record_id.record_uuid.uuid = slice_.slice_id.slice_uuid.uuid + + LOGGER.info('[RecordSlice] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id))) + dlt_record = dltgateway_client.GetFromDlt(dlt_record_id) + LOGGER.info('[RecordSlice] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) + + exists = record_exists(dlt_record) + LOGGER.info('[RecordSlice] exists = {:s}'.format(str(exists))) + + dlt_record = DltRecord() + dlt_record.record_id.CopyFrom(dlt_record_id) + dlt_record.operation = \ + DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \ + if exists else \ + DltRecordOperationEnum.DLTRECORDOPERATION_ADD + + dlt_record.data_json = grpc_message_to_json_string(slice_) + LOGGER.info('[RecordSlice] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) + dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) + LOGGER.info('[RecordSlice] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status))) return Empty() diff --git a/src/dlt/connector/service/__main__.py b/src/dlt/connector/service/__main__.py index baab4e698f20247960361f7cf0f0845a1f72843c..76e7bc6f1bb1b50e736327d8f08c0880e45c6835 100644 --- a/src/dlt/connector/service/__main__.py +++ b/src/dlt/connector/service/__main__.py @@ -32,7 +32,7 @@ def main(): global LOGGER # pylint: disable=global-statement log_level = get_log_level() - logging.basicConfig(level=log_level) + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py index 272b1e1460195c77db8fcc886dcefdaca9f648d9..e0e700eccc715f682022d13289f0aa5a946e9717 100644 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py @@ -12,20 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json, logging, threading -from typing import Any +import grpc, json, logging, threading +from typing import Any, Dict, Set from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID -from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link -from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum +from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId +from common.proto.dlt_connector_pb2 import DltSliceId +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.context_queries.Context import create_context from common.tools.context_queries.Device import add_device_to_topology from common.tools.context_queries.Link import add_link_to_topology from common.tools.context_queries.Topology import create_topology from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient +from dlt.connector.client.DltConnectorClient import DltConnectorClient from dlt.connector.client.DltEventsCollector import DltEventsCollector from dlt.connector.client.DltGatewayClient import DltGatewayClient +from interdomain.client.InterdomainClient import InterdomainClient LOGGER = logging.getLogger(__name__) @@ -33,6 +37,19 @@ GET_EVENT_TIMEOUT = 0.5 ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) +class Clients: + def __init__(self) -> None: + self.context_client = ContextClient() + self.dlt_connector_client = DltConnectorClient() + self.dlt_gateway_client = DltGatewayClient() + self.interdomain_client = InterdomainClient() + + def close(self) -> None: + self.interdomain_client.close() + self.dlt_gateway_client.close() + self.dlt_connector_client.close() + self.context_client.close() + class DltEventDispatcher(threading.Thread): def __init__(self) -> None: LOGGER.debug('Creating connector...') @@ -48,61 +65,137 @@ class DltEventDispatcher(threading.Thread): self._terminate.set() def run(self) -> None: - context_client = ContextClient() - create_context(context_client, DEFAULT_CONTEXT_UUID) - create_topology(context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID) - create_topology(context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID) + clients = Clients() + create_context(clients.context_client, DEFAULT_CONTEXT_UUID) + create_topology(clients.context_client, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID) + create_topology(clients.context_client, DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID) - dlt_gateway_client = DltGatewayClient() - dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True) + dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True) dlt_events_collector.start() while not self._terminate.is_set(): event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue - existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID))) - existing_topology_uuids = { + existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID) + local_domain_uuids = { topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids } - existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID) - existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID) + local_domain_uuids.discard(DEFAULT_TOPOLOGY_UUID) + local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID) - if event.record_id.domain_uuid.uuid in existing_topology_uuids: - LOGGER.info('Ignoring DLT event received (local): {:s}'.format(grpc_message_to_json_string(event))) - else: - LOGGER.info('DLT event received (remote): {:s}'.format(grpc_message_to_json_string(event))) - self.dispatch_event(context_client, dlt_gateway_client, event) + self.dispatch_event(clients, local_domain_uuids, event) dlt_events_collector.stop() - dlt_gateway_client.close() - context_client.close() + clients.close() - def dispatch_event( - self, context_client : ContextClient, dlt_gateway_client : DltGatewayClient, event : Any - ) -> None: - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} - - LOGGER.info('[dispatch_event] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = dlt_gateway_client.GetFromDlt(event.record_id) - LOGGER.info('[dispatch_event] record={:s}'.format(grpc_message_to_json_string(record))) - if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: - if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: - device = Device(**json.loads(record.data_json)) - context_client.SetDevice(device) - device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member - add_device_to_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid) - elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: - raise NotImplementedError('Delete Device') + self._dispatch_device(clients, local_domain_uuids, event) elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: + self._dispatch_link(clients, local_domain_uuids, event) + elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE: + self._dispatch_slice(clients, local_domain_uuids, event) + else: + raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) + + def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid + + if domain_uuid in local_domain_uuids: + MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + return + + MSG = '[_dispatch_device] DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record))) + + create_context(clients.context_client, domain_uuid) + create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID) + device = Device(**json.loads(record.data_json)) + clients.context_client.SetDevice(device) + device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member + add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, device_uuid) + domain_context_id = ContextId(**json_context_id(domain_uuid)) + add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_UUID, device_uuid) + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: + raise NotImplementedError('Delete Device') + + def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid + + if domain_uuid in local_domain_uuids: + MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + return + + MSG = '[_dispatch_link] DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record))) + + link = Link(**json.loads(record.data_json)) + clients.context_client.SetLink(link) + link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member + add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid) + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: + raise NotImplementedError('Delete Link') + + def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + domain_uuid : str = event.record_id.domain_uuid.uuid + + LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record))) + + slice_ = Slice(**json.loads(record.data_json)) + + context_uuid = slice_.slice_id.context_id.context_uuid.uuid + owner_uuid = slice_.slice_owner.owner_uuid.uuid + create_context(clients.context_client, context_uuid) + create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_UUID) + + if domain_uuid in local_domain_uuids: + # it is for "me" if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: - link = Link(**json.loads(record.data_json)) - context_client.SetLink(link) - link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member - add_link_to_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID, link_uuid) + try: + db_slice = clients.context_client.GetSlice(slice_.slice_id) + # exists + db_json_slice = grpc_message_to_json_string(db_slice) + except grpc.RpcError: + # not exists + db_json_slice = None + + _json_slice = grpc_message_to_json_string(slice_) + if db_json_slice != _json_slice: + # not exists or is different... + slice_id = clients.interdomain_client.RequestSlice(slice_) + topology_id = TopologyId(**json_topology_id(domain_uuid)) + dlt_slice_id = DltSliceId() + dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + clients.dlt_connector_client.RecordSlice(dlt_slice_id) + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: - raise NotImplementedError('Delete Link') + raise NotImplementedError('Delete Slice') + elif owner_uuid in local_domain_uuids: + # it is owned by me + # just update it locally + LOGGER.info('[_dispatch_slice] updating locally') + clients.context_client.SetSlice(slice_) else: - raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) + MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) +