diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py new file mode 100644 index 0000000000000000000000000000000000000000..779bae9c19f583ec47584673da51b536cc1dc8b3 --- /dev/null +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py @@ -0,0 +1,209 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, json, logging, threading +from typing import Any, Dict, Set +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME +from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId +from common.proto.dlt_connector_pb2 import DltSliceId +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum +from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import add_device_to_topology +from common.tools.context_queries.Link import add_link_to_topology +from common.tools.context_queries.Topology import create_topology +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Topology import json_topology_id +from context.client.ContextClient import ContextClient +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from dlt.connector.client.DltEventsCollector import DltEventsCollector +from dlt.connector.client.DltGatewayClient import DltGatewayClient +from interdomain.client.InterdomainClient import InterdomainClient + +LOGGER = logging.getLogger(__name__) + +GET_EVENT_TIMEOUT = 0.5 + +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + +class Clients: + def __init__(self) -> None: + self.context_client = ContextClient() + self.dlt_connector_client = DltConnectorClient() + self.dlt_gateway_client = DltGatewayClient() + self.interdomain_client = InterdomainClient() + + def close(self) -> None: + self.interdomain_client.close() + self.dlt_gateway_client.close() + self.dlt_connector_client.close() + self.context_client.close() + +class DltEventDispatcher(threading.Thread): + def __init__(self) -> None: + LOGGER.debug('Creating connector...') + super().__init__(name='DltEventDispatcher', daemon=True) + self._terminate = threading.Event() + LOGGER.debug('Connector created') + + def start(self) -> None: + self._terminate.clear() + return super().start() + + def stop(self): + self._terminate.set() + + def run(self) -> None: + clients = Clients() + create_context(clients.context_client, DEFAULT_CONTEXT_NAME) + create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME) + create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME) + + dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True) + dlt_events_collector.start() + + while not self._terminate.is_set(): + event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue + + existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID) + local_domain_uuids = { + topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids + } + local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME) + local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME) + + self.dispatch_event(clients, local_domain_uuids, event) + + dlt_events_collector.stop() + clients.close() + + def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} + if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: + self._dispatch_device(clients, local_domain_uuids, event) + elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: + self._dispatch_link(clients, local_domain_uuids, event) + elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE: + self._dispatch_slice(clients, local_domain_uuids, event) + else: + raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) + + def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid + + if domain_uuid in local_domain_uuids: + MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + return + + MSG = '[_dispatch_device] DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record))) + + create_context(clients.context_client, domain_uuid) + create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) + device = Device(**json.loads(record.data_json)) + clients.context_client.SetDevice(device) + device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member + add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid) + domain_context_id = ContextId(**json_context_id(domain_uuid)) + add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid) + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: + raise NotImplementedError('Delete Device') + + def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid + + if domain_uuid in local_domain_uuids: + MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + return + + MSG = '[_dispatch_link] DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record))) + + link = Link(**json.loads(record.data_json)) + clients.context_client.SetLink(link) + link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member + add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid) + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: + raise NotImplementedError('Delete Link') + + def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + domain_uuid : str = event.record_id.domain_uuid.uuid + + LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record))) + + slice_ = Slice(**json.loads(record.data_json)) + + context_uuid = slice_.slice_id.context_id.context_uuid.uuid + owner_uuid = slice_.slice_owner.owner_uuid.uuid + create_context(clients.context_client, context_uuid) + create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME) + + if domain_uuid in local_domain_uuids: + # it is for "me" + if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: + try: + db_slice = clients.context_client.GetSlice(slice_.slice_id) + # exists + db_json_slice = grpc_message_to_json_string(db_slice) + except grpc.RpcError: + # not exists + db_json_slice = None + + _json_slice = grpc_message_to_json_string(slice_) + if db_json_slice != _json_slice: + # not exists or is different... + slice_id = clients.interdomain_client.RequestSlice(slice_) + topology_id = TopologyId(**json_topology_id(domain_uuid)) + dlt_slice_id = DltSliceId() + dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + clients.dlt_connector_client.RecordSlice(dlt_slice_id) + + elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: + raise NotImplementedError('Delete Slice') + elif owner_uuid in local_domain_uuids: + # it is owned by me + # just update it locally + LOGGER.info('[_dispatch_slice] updating locally') + + local_slice = Slice() + local_slice.CopyFrom(slice_) + + # pylint: disable=no-member + del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally + + clients.context_client.SetSlice(local_slice) + else: + MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py index 779bae9c19f583ec47584673da51b536cc1dc8b3..76104e2b7c3e775f05509892c190b0dab62945d9 100644 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py @@ -1,23 +1,11 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, json, logging, threading +import asyncio +import logging +import json from typing import Any, Dict, Set from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId from common.proto.dlt_connector_pb2 import DltSliceId -from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordTypeEnum from common.tools.context_queries.Context import create_context from common.tools.context_queries.Device import add_device_to_topology from common.tools.context_queries.Link import add_link_to_topology @@ -50,31 +38,30 @@ class Clients: self.dlt_connector_client.close() self.context_client.close() -class DltEventDispatcher(threading.Thread): +class DltEventDispatcher: def __init__(self) -> None: LOGGER.debug('Creating connector...') - super().__init__(name='DltEventDispatcher', daemon=True) - self._terminate = threading.Event() + self._terminate = asyncio.Event() LOGGER.debug('Connector created') - def start(self) -> None: + async def start(self) -> None: self._terminate.clear() - return super().start() + await self.run() - def stop(self): + async def stop(self): self._terminate.set() - def run(self) -> None: + async def run(self) -> None: clients = Clients() create_context(clients.context_client, DEFAULT_CONTEXT_NAME) create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME) create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME) dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True) - dlt_events_collector.start() + await dlt_events_collector.run() while not self._terminate.is_set(): - event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + event = await dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID) @@ -84,24 +71,24 @@ class DltEventDispatcher(threading.Thread): local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME) local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME) - self.dispatch_event(clients, local_domain_uuids, event) + await self.dispatch_event(clients, local_domain_uuids, event) - dlt_events_collector.stop() + await dlt_events_collector.stop() clients.close() - def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} + async def dispatch_event(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: + record_type: DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: - self._dispatch_device(clients, local_domain_uuids, event) + await self._dispatch_device(clients, local_domain_uuids, event) elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: - self._dispatch_link(clients, local_domain_uuids, event) + await self._dispatch_link(clients, local_domain_uuids, event) elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE: - self._dispatch_slice(clients, local_domain_uuids, event) + await self._dispatch_slice(clients, local_domain_uuids, event) else: raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) - def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - domain_uuid : str = event.record_id.domain_uuid.uuid + async def _dispatch_device(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: + domain_uuid: str = event.record_id.domain_uuid.uuid if domain_uuid in local_domain_uuids: MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}' @@ -111,25 +98,25 @@ class DltEventDispatcher(threading.Thread): MSG = '[_dispatch_device] DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record))) create_context(clients.context_client, domain_uuid) create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) device = Device(**json.loads(record.data_json)) clients.context_client.SetDevice(device) - device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member + device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid) domain_context_id = ContextId(**json_context_id(domain_uuid)) add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Device') - def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - domain_uuid : str = event.record_id.domain_uuid.uuid + async def _dispatch_link(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: + domain_uuid: str = event.record_id.domain_uuid.uuid if domain_uuid in local_domain_uuids: MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}' @@ -139,25 +126,25 @@ class DltEventDispatcher(threading.Thread): MSG = '[_dispatch_link] DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record))) link = Link(**json.loads(record.data_json)) clients.context_client.SetLink(link) - link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member + link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Link') - def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} - domain_uuid : str = event.record_id.domain_uuid.uuid + async def _dispatch_slice(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: + event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + domain_uuid: str = event.record_id.domain_uuid.uuid LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record))) slice_ = Slice(**json.loads(record.data_json)) @@ -185,7 +172,7 @@ class DltEventDispatcher(threading.Thread): topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_slice_id = DltSliceId() dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member clients.dlt_connector_client.RecordSlice(dlt_slice_id) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: @@ -199,11 +186,10 @@ class DltEventDispatcher(threading.Thread): local_slice.CopyFrom(slice_) # pylint: disable=no-member - del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally - del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally clients.context_client.SetSlice(local_slice) else: MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) -