From 420053974ae4c4ad48e4699d85ffa31c83ba1538 Mon Sep 17 00:00:00 2001 From: diazjj <jjdiaz@cttc.es> Date: Mon, 29 Jul 2024 14:59:08 +0200 Subject: [PATCH] Revert "Debugging" This reverts commit 8503a835ba6a889cdb47a8666a3abb60fd63242a. --- .../connector/client/DltConnectorClient.py | 38 ++-- .../client/DltEventsCollector copy.py | 95 -------- .../connector/client/DltEventsCollector.py | 74 ++++--- src/dlt/connector/client/DltGatewayClient.py | 22 +- src/dlt/connector/client/async.py | 74 ------- .../DltConnectorServiceServicerImpl.py | 28 +-- .../DltEventDispatcher copy.py | 209 ------------------ .../event_dispatcher/DltEventDispatcher.py | 86 ++++--- src/interdomain/service/__main__.py | 16 +- .../topology_abstractor/DltRecordSender.py | 10 +- .../topology_abstractor/DltRecorder copy.py | 166 -------------- .../topology_abstractor/DltRecorder.py | 89 ++++---- 12 files changed, 186 insertions(+), 721 deletions(-) delete mode 100644 src/dlt/connector/client/DltEventsCollector copy.py delete mode 100644 src/dlt/connector/client/async.py delete mode 100644 src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py delete mode 100644 src/interdomain/service/topology_abstractor/DltRecorder copy.py diff --git a/src/dlt/connector/client/DltConnectorClient.py b/src/dlt/connector/client/DltConnectorClient.py index 7cfb6b594..e383217d8 100644 --- a/src/dlt/connector/client/DltConnectorClient.py +++ b/src/dlt/connector/client/DltConnectorClient.py @@ -39,7 +39,7 @@ class DltConnectorClient: LOGGER.debug('Channel created') def connect(self): - self.channel = grpc.aio.insecure_channel(self.endpoint) + self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) def close(self): @@ -48,64 +48,64 @@ class DltConnectorClient: self.stub = None @RETRY_DECORATOR - async def RecordAll(self, request : TopologyId) -> Empty: + def RecordAll(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordAll(request) + response = self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordAllDevices(self, request : TopologyId) -> Empty: + def RecordAllDevices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordAllDevices(request) + response = self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordDevice(self, request : DltDeviceId) -> Empty: + def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordDevice(request) + response = self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordAllLinks(self, request : TopologyId) -> Empty: + def RecordAllLinks(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordAllLinks(request) + response = self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordLink(self, request : DltLinkId) -> Empty: + def RecordLink(self, request : DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordLink(request) + response = self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordAllServices(self, request : TopologyId) -> Empty: + def RecordAllServices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordAllServices(request) + response = self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordService(self, request : DltServiceId) -> Empty: + def RecordService(self, request : DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordService(request) + response = self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordAllSlices(self, request : TopologyId) -> Empty: + def RecordAllSlices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordAllSlices(request) + response = self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def RecordSlice(self, request : DltSliceId) -> Empty: + def RecordSlice(self, request : DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordSlice(request) + response = self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/client/DltEventsCollector copy.py b/src/dlt/connector/client/DltEventsCollector copy.py deleted file mode 100644 index 9fac60b7c..000000000 --- a/src/dlt/connector/client/DltEventsCollector copy.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Callable, Optional -import asyncio -import grpc, logging, queue, threading, time -from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription -from common.tools.grpc.Tools import grpc_message_to_json_string -from dlt.connector.client.DltGatewayClient import DltGatewayClient - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - -# This class accepts an event_handler method as attribute that can be used to pre-process and -# filter events before they reach the events_queue. Depending on the handler, the supported -# behaviors are: -# - If the handler is not set, the events are transparently added to the events_queue. -# - If returns None for an event, the event is not stored in the events_queue. -# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue. -# - Other combinations are not supported. - -class DltEventsCollector(threading.Thread): - def __init__( - self, dltgateway_client : DltGatewayClient, - log_events_received : bool = False, - event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, - ) -> None: - super().__init__(name='DltEventsCollector', daemon=True) - self._dltgateway_client = dltgateway_client - self._log_events_received = log_events_received - self._event_handler = event_handler - self._events_queue = queue.Queue() - self._terminate = threading.Event() - self._dltgateway_stream = None - - def run(self) -> None: - event_handler = self._event_handler - if event_handler is None: event_handler = lambda e: e - self._terminate.clear() - while not self._terminate.is_set(): - try: - subscription = DltRecordSubscription() # bu default subscribe to all - self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription) - for event in self._dltgateway_stream: - if self._log_events_received: - LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) - event = event_handler(event) - if event is None: continue - if not isinstance(event, DltRecordEvent): - # pylint: disable=broad-exception-raised - raise Exception('Unsupported return type: {:s}'.format(str(event))) - self._events_queue.put_nowait(event) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member - time.sleep(0.5) - continue - elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member - break - else: - raise # pragma: no cover - - def get_event(self, block : bool = True, timeout : float = 0.1): - try: - return self._events_queue.get(block=block, timeout=timeout) - except queue.Empty: # pylint: disable=catching-non-exception - return None - - def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): - events = [] - if count is None: - while True: - event = self.get_event(block=block, timeout=timeout) - if event is None: break - events.append(event) - else: - for _ in range(count): - event = self.get_event(block=block, timeout=timeout) - if event is None: continue - events.append(event) - return sorted(events, key=lambda e: e.event.timestamp.timestamp) - - def stop(self): - self._terminate.set() - if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py index 31eaf8542..e59784a4d 100644 --- a/src/dlt/connector/client/DltEventsCollector.py +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -13,9 +13,7 @@ # limitations under the License. from typing import Callable, Optional -import asyncio -import grpc -import logging +import grpc, logging, queue, threading, time from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClient import DltGatewayClient @@ -23,70 +21,74 @@ from dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -class DltEventsCollector: +# This class accepts an event_handler method as attribute that can be used to pre-process and +# filter events before they reach the events_queue. Depending on the handler, the supported +# behaviors are: +# - If the handler is not set, the events are transparently added to the events_queue. +# - If returns None for an event, the event is not stored in the events_queue. +# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue. +# - Other combinations are not supported. + +class DltEventsCollector(threading.Thread): def __init__( - self, dltgateway_client: DltGatewayClient, - log_events_received: bool = False, - event_handler: Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, + self, dltgateway_client : DltGatewayClient, + log_events_received : bool = False, + event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: + super().__init__(name='DltEventsCollector', daemon=True) self._dltgateway_client = dltgateway_client self._log_events_received = log_events_received self._event_handler = event_handler - self._events_queue = asyncio.Queue() - self._terminate = asyncio.Event() + self._events_queue = queue.Queue() + self._terminate = threading.Event() self._dltgateway_stream = None - async def run(self) -> None: + def run(self) -> None: event_handler = self._event_handler - if event_handler is None: - event_handler = lambda e: e + if event_handler is None: event_handler = lambda e: e self._terminate.clear() while not self._terminate.is_set(): try: - subscription = DltRecordSubscription() # by default subscribe to all - self._dltgateway_stream = await self._dltgateway_client.SubscribeToDlt(subscription) - async for event in self._dltgateway_stream: + subscription = DltRecordSubscription() # bu default subscribe to all + self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription) + for event in self._dltgateway_stream: if self._log_events_received: LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) event = event_handler(event) - if event is None: - continue + if event is None: continue if not isinstance(event, DltRecordEvent): + # pylint: disable=broad-exception-raised raise Exception('Unsupported return type: {:s}'.format(str(event))) - await self._events_queue.put(event) + self._events_queue.put_nowait(event) except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member - await asyncio.sleep(0.5) + if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member + time.sleep(0.5) continue - elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member + elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member break else: - raise # pragma: no cover + raise # pragma: no cover - async def get_event(self, block: bool = True, timeout: float = 0.1): + def get_event(self, block : bool = True, timeout : float = 0.1): try: - return await asyncio.wait_for(self._events_queue.get(), timeout) - except asyncio.TimeoutError: + return self._events_queue.get(block=block, timeout=timeout) + except queue.Empty: # pylint: disable=catching-non-exception return None - async def get_events(self, block: bool = True, timeout: float = 0.1, count: int = None): + def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): events = [] if count is None: while True: - event = await self.get_event(block=block, timeout=timeout) - if event is None: - break + event = self.get_event(block=block, timeout=timeout) + if event is None: break events.append(event) else: for _ in range(count): - event = await self.get_event(block=block, timeout=timeout) - if event is None: - continue + event = self.get_event(block=block, timeout=timeout) + if event is None: continue events.append(event) return sorted(events, key=lambda e: e.event.timestamp.timestamp) - async def stop(self): + def stop(self): self._terminate.set() - if self._dltgateway_stream is not None: - await self._dltgateway_stream.cancel() - + if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index f9b37a2db..cde278517 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -40,7 +40,7 @@ class DltGatewayClient: LOGGER.debug('Channel created') def connect(self): - self.channel = grpc.aio.insecure_channel(self.endpoint) + self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltGatewayServiceStub(self.channel) def close(self): @@ -49,36 +49,36 @@ class DltGatewayClient: self.stub = None @RETRY_DECORATOR - async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: + def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordToDlt(request) + response = self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def GetFromDlt(self, request : DltRecordId) -> DltRecord: + def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetFromDlt(request) + response = self.stub.GetFromDlt(request) LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]: + def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]: LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.SubscribeToDlt(request) + response = self.stub.SubscribeToDlt(request) LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: + def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetDltStatus(request) + response = self.stub.GetDltStatus(request) LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - async def GetDltPeers(self, request : Empty) -> DltPeerStatusList: + def GetDltPeers(self, request : Empty) -> DltPeerStatusList: LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetDltPeers(request) + response = self.stub.GetDltPeers(request) LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/client/async.py b/src/dlt/connector/client/async.py deleted file mode 100644 index e38f124c2..000000000 --- a/src/dlt/connector/client/async.py +++ /dev/null @@ -1,74 +0,0 @@ -# DltGatewayClient.py - -import grpc -import logging -from typing import Iterator -from common.proto.context_pb2 import Empty, TeraFlowController -from common.proto.dlt_gateway_pb2 import ( - DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription -) -from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub -from common.tools.client.RetryDecorator import retry, delay_exponential -from common.tools.grpc.Tools import grpc_message_to_json_string -from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) -MAX_RETRIES = 15 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) -RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') - -class DltGatewayClient: - def __init__(self, host=None, port=None): - if not host: host = DLT_GATEWAY_HOST - if not port: port = DLT_GATEWAY_PORT - self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) - LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) - self.channel = None - self.stub = None - self.connect() - LOGGER.debug('Channel created') - - def connect(self): - self.channel = grpc.aio.insecure_channel(self.endpoint) - self.stub = DltGatewayServiceStub(self.channel) - - def close(self): - if self.channel is not None: self.channel.close() - self.channel = None - self.stub = None - - @RETRY_DECORATOR - async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus: - LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.RecordToDlt(request) - LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - async def GetFromDlt(self, request: DltRecordId) -> DltRecord: - LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetFromDlt(request) - LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - async def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: - LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.SubscribeToDlt(request) - LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - async def GetDltStatus(self, request: TeraFlowController) -> DltPeerStatus: - LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetDltStatus(request) - LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - async def GetDltPeers(self, request: Empty) -> DltPeerStatusList: - LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) - response = await self.stub.GetDltPeers(request) - LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) - return response diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index df5e8fd08..c05d46b48 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -35,22 +35,22 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: + def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() device = context_client.GetDevice(request.device_id) data_json = grpc_message_to_json_string(device) - await self._record_entity( + self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_DEVICE, request.device_id.device_uuid.uuid, request.delete, data_json) return Empty() @@ -60,53 +60,53 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: + def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() link = context_client.GetLink(request.link_id) data_json = grpc_message_to_json_string(link) - await self._record_entity( + self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_LINK, request.link_id.link_uuid.uuid, request.delete, data_json) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: + def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() service = context_client.GetService(request.service_id) data_json = grpc_message_to_json_string(service) - await self._record_entity( + self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SERVICE, request.service_id.service_uuid.uuid, request.delete, data_json) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - async def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: + def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() slice_ = context_client.GetSlice(request.slice_id) data_json = grpc_message_to_json_string(slice_) - await self._record_entity( + self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SLICE, request.slice_id.slice_uuid.uuid, request.delete, data_json) return Empty() - async def _record_entity( + def _record_entity( self, dlt_domain_uuid : str, dlt_record_type : DltRecordTypeEnum, dlt_record_uuid : str, delete : bool, data_json : Optional[str] = None ) -> None: @@ -143,6 +143,6 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): str_dlt_record = grpc_message_to_json_string(dlt_record) LOGGER.debug('[_record_entity] sent dlt_record = {:s}'.format(str_dlt_record)) - dlt_record_status = await dltgateway_client.RecordToDlt(dlt_record) + dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) str_dlt_record_status = grpc_message_to_json_string(dlt_record_status) LOGGER.debug('[_record_entity] recv dlt_record_status = {:s}'.format(str_dlt_record_status)) diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py deleted file mode 100644 index 779bae9c1..000000000 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher copy.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, json, logging, threading -from typing import Any, Dict, Set -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME -from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId -from common.proto.dlt_connector_pb2 import DltSliceId -from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum -from common.tools.context_queries.Context import create_context -from common.tools.context_queries.Device import add_device_to_topology -from common.tools.context_queries.Link import add_link_to_topology -from common.tools.context_queries.Topology import create_topology -from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Topology import json_topology_id -from context.client.ContextClient import ContextClient -from dlt.connector.client.DltConnectorClient import DltConnectorClient -from dlt.connector.client.DltEventsCollector import DltEventsCollector -from dlt.connector.client.DltGatewayClient import DltGatewayClient -from interdomain.client.InterdomainClient import InterdomainClient - -LOGGER = logging.getLogger(__name__) - -GET_EVENT_TIMEOUT = 0.5 - -ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) - -class Clients: - def __init__(self) -> None: - self.context_client = ContextClient() - self.dlt_connector_client = DltConnectorClient() - self.dlt_gateway_client = DltGatewayClient() - self.interdomain_client = InterdomainClient() - - def close(self) -> None: - self.interdomain_client.close() - self.dlt_gateway_client.close() - self.dlt_connector_client.close() - self.context_client.close() - -class DltEventDispatcher(threading.Thread): - def __init__(self) -> None: - LOGGER.debug('Creating connector...') - super().__init__(name='DltEventDispatcher', daemon=True) - self._terminate = threading.Event() - LOGGER.debug('Connector created') - - def start(self) -> None: - self._terminate.clear() - return super().start() - - def stop(self): - self._terminate.set() - - def run(self) -> None: - clients = Clients() - create_context(clients.context_client, DEFAULT_CONTEXT_NAME) - create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME) - create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME) - - dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True) - dlt_events_collector.start() - - while not self._terminate.is_set(): - event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) - if event is None: continue - - existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID) - local_domain_uuids = { - topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids - } - local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME) - local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME) - - self.dispatch_event(clients, local_domain_uuids, event) - - dlt_events_collector.stop() - clients.close() - - def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} - if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: - self._dispatch_device(clients, local_domain_uuids, event) - elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: - self._dispatch_link(clients, local_domain_uuids, event) - elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE: - self._dispatch_slice(clients, local_domain_uuids, event) - else: - raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) - - def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - domain_uuid : str = event.record_id.domain_uuid.uuid - - if domain_uuid in local_domain_uuids: - MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}' - LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - return - - MSG = '[_dispatch_device] DLT event received (remote): {:s}' - LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} - if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: - LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) - LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record))) - - create_context(clients.context_client, domain_uuid) - create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) - device = Device(**json.loads(record.data_json)) - clients.context_client.SetDevice(device) - device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member - add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid) - domain_context_id = ContextId(**json_context_id(domain_uuid)) - add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid) - elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: - raise NotImplementedError('Delete Device') - - def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - domain_uuid : str = event.record_id.domain_uuid.uuid - - if domain_uuid in local_domain_uuids: - MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}' - LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - return - - MSG = '[_dispatch_link] DLT event received (remote): {:s}' - LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} - if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: - LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) - LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record))) - - link = Link(**json.loads(record.data_json)) - clients.context_client.SetLink(link) - link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member - add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid) - elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: - raise NotImplementedError('Delete Link') - - def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: - event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} - domain_uuid : str = event.record_id.domain_uuid.uuid - - LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = clients.dlt_gateway_client.GetFromDlt(event.record_id) - LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record))) - - slice_ = Slice(**json.loads(record.data_json)) - - context_uuid = slice_.slice_id.context_id.context_uuid.uuid - owner_uuid = slice_.slice_owner.owner_uuid.uuid - create_context(clients.context_client, context_uuid) - create_topology(clients.context_client, context_uuid, DEFAULT_TOPOLOGY_NAME) - - if domain_uuid in local_domain_uuids: - # it is for "me" - if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: - try: - db_slice = clients.context_client.GetSlice(slice_.slice_id) - # exists - db_json_slice = grpc_message_to_json_string(db_slice) - except grpc.RpcError: - # not exists - db_json_slice = None - - _json_slice = grpc_message_to_json_string(slice_) - if db_json_slice != _json_slice: - # not exists or is different... - slice_id = clients.interdomain_client.RequestSlice(slice_) - topology_id = TopologyId(**json_topology_id(domain_uuid)) - dlt_slice_id = DltSliceId() - dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member - clients.dlt_connector_client.RecordSlice(dlt_slice_id) - - elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: - raise NotImplementedError('Delete Slice') - elif owner_uuid in local_domain_uuids: - # it is owned by me - # just update it locally - LOGGER.info('[_dispatch_slice] updating locally') - - local_slice = Slice() - local_slice.CopyFrom(slice_) - - # pylint: disable=no-member - del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally - del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally - - clients.context_client.SetSlice(local_slice) - else: - MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}' - LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py index 76104e2b7..779bae9c1 100644 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py @@ -1,11 +1,23 @@ -import asyncio -import logging -import json +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, json, logging, threading from typing import Any, Dict, Set from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME from common.proto.context_pb2 import ContextId, Device, EventTypeEnum, Link, Slice, TopologyId from common.proto.dlt_connector_pb2 import DltSliceId -from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordTypeEnum +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.context_queries.Context import create_context from common.tools.context_queries.Device import add_device_to_topology from common.tools.context_queries.Link import add_link_to_topology @@ -38,30 +50,31 @@ class Clients: self.dlt_connector_client.close() self.context_client.close() -class DltEventDispatcher: +class DltEventDispatcher(threading.Thread): def __init__(self) -> None: LOGGER.debug('Creating connector...') - self._terminate = asyncio.Event() + super().__init__(name='DltEventDispatcher', daemon=True) + self._terminate = threading.Event() LOGGER.debug('Connector created') - async def start(self) -> None: + def start(self) -> None: self._terminate.clear() - await self.run() + return super().start() - async def stop(self): + def stop(self): self._terminate.set() - async def run(self) -> None: + def run(self) -> None: clients = Clients() create_context(clients.context_client, DEFAULT_CONTEXT_NAME) create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME) create_topology(clients.context_client, DEFAULT_CONTEXT_NAME, INTERDOMAIN_TOPOLOGY_NAME) dlt_events_collector = DltEventsCollector(clients.dlt_gateway_client, log_events_received=True) - await dlt_events_collector.run() + dlt_events_collector.start() while not self._terminate.is_set(): - event = await dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + event = dlt_events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue existing_topology_ids = clients.context_client.ListTopologyIds(ADMIN_CONTEXT_ID) @@ -71,24 +84,24 @@ class DltEventDispatcher: local_domain_uuids.discard(DEFAULT_TOPOLOGY_NAME) local_domain_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME) - await self.dispatch_event(clients, local_domain_uuids, event) + self.dispatch_event(clients, local_domain_uuids, event) - await dlt_events_collector.stop() + dlt_events_collector.stop() clients.close() - async def dispatch_event(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: - record_type: DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} + def dispatch_event(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + record_type : DltRecordTypeEnum = event.record_id.type # {UNDEFINED/CONTEXT/TOPOLOGY/DEVICE/LINK/SERVICE/SLICE} if record_type == DltRecordTypeEnum.DLTRECORDTYPE_DEVICE: - await self._dispatch_device(clients, local_domain_uuids, event) + self._dispatch_device(clients, local_domain_uuids, event) elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_LINK: - await self._dispatch_link(clients, local_domain_uuids, event) + self._dispatch_link(clients, local_domain_uuids, event) elif record_type == DltRecordTypeEnum.DLTRECORDTYPE_SLICE: - await self._dispatch_slice(clients, local_domain_uuids, event) + self._dispatch_slice(clients, local_domain_uuids, event) else: raise NotImplementedError('EventType: {:s}'.format(grpc_message_to_json_string(event))) - async def _dispatch_device(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: - domain_uuid: str = event.record_id.domain_uuid.uuid + def _dispatch_device(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid if domain_uuid in local_domain_uuids: MSG = '[_dispatch_device] Ignoring DLT event received (local): {:s}' @@ -98,25 +111,25 @@ class DltEventDispatcher: MSG = '[_dispatch_device] DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: LOGGER.info('[_dispatch_device] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_device] record={:s}'.format(grpc_message_to_json_string(record))) create_context(clients.context_client, domain_uuid) create_topology(clients.context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) device = Device(**json.loads(record.data_json)) clients.context_client.SetDevice(device) - device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member + device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member add_device_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, device_uuid) domain_context_id = ContextId(**json_context_id(domain_uuid)) add_device_to_topology(clients.context_client, domain_context_id, DEFAULT_TOPOLOGY_NAME, device_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Device') - async def _dispatch_link(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: - domain_uuid: str = event.record_id.domain_uuid.uuid + def _dispatch_link(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + domain_uuid : str = event.record_id.domain_uuid.uuid if domain_uuid in local_domain_uuids: MSG = '[_dispatch_link] Ignoring DLT event received (local): {:s}' @@ -126,25 +139,25 @@ class DltEventDispatcher: MSG = '[_dispatch_link] DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) - event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} if event_type in {EventTypeEnum.EVENTTYPE_CREATE, EventTypeEnum.EVENTTYPE_UPDATE}: LOGGER.info('[_dispatch_link] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_link] record={:s}'.format(grpc_message_to_json_string(record))) link = Link(**json.loads(record.data_json)) clients.context_client.SetLink(link) - link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member + link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member add_link_to_topology(clients.context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME, link_uuid) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: raise NotImplementedError('Delete Link') - async def _dispatch_slice(self, clients: Clients, local_domain_uuids: Set[str], event: DltRecordEvent) -> None: - event_type: EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} - domain_uuid: str = event.record_id.domain_uuid.uuid + def _dispatch_slice(self, clients : Clients, local_domain_uuids : Set[str], event : DltRecordEvent) -> None: + event_type : EventTypeEnum = event.event.event_type # {UNDEFINED/CREATE/UPDATE/REMOVE} + domain_uuid : str = event.record_id.domain_uuid.uuid LOGGER.info('[_dispatch_slice] event.record_id={:s}'.format(grpc_message_to_json_string(event.record_id))) - record = await clients.dlt_gateway_client.GetFromDlt(event.record_id) + record = clients.dlt_gateway_client.GetFromDlt(event.record_id) LOGGER.info('[_dispatch_slice] record={:s}'.format(grpc_message_to_json_string(record))) slice_ = Slice(**json.loads(record.data_json)) @@ -172,7 +185,7 @@ class DltEventDispatcher: topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_slice_id = DltSliceId() dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member clients.dlt_connector_client.RecordSlice(dlt_slice_id) elif event_type in {EventTypeEnum.EVENTTYPE_DELETE}: @@ -186,10 +199,11 @@ class DltEventDispatcher: local_slice.CopyFrom(slice_) # pylint: disable=no-member - del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally - del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally clients.context_client.SetSlice(local_slice) else: MSG = '[_dispatch_slice] Ignoring DLT event received (remote): {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index 3f5ccd183..8c392821e 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -13,8 +13,6 @@ # limitations under the License. import logging, signal, sys, threading -import asyncio - from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( @@ -29,12 +27,6 @@ from .RemoteDomainClients import RemoteDomainClients terminate = threading.Event() LOGGER : logging.Logger = None - -async def run_dlt_recorder(dlt_recorder): - await dlt_recorder.start() - await terminate.wait() - await dlt_recorder.stop() - def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() @@ -84,9 +76,7 @@ def main(): if dlt_enabled: LOGGER.info('Starting DLT functionality...') dlt_recorder = DLTRecorder() - #dlt_recorder.start() - loop = asyncio.get_event_loop() - loop.run_until_complete(run_dlt_recorder(dlt_recorder)) + dlt_recorder.start() # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass @@ -95,9 +85,7 @@ def main(): # if topology_abstractor_enabled: # topology_abstractor.stop() if dlt_enabled: - #dlt_recorder.stop() - loop.run_until_complete(dlt_recorder.stop()) - loop.close() + dlt_recorder.stop() grpc_service.stop() remote_domain_clients.stop() diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py index c17878e0e..cfa51c928 100644 --- a/src/interdomain/service/topology_abstractor/DltRecordSender.py +++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py @@ -62,7 +62,7 @@ class DltRecordSender: record_uuid = '{:s}:slice:{:s}/{:s}'.format(topology_uuid, context_uuid, slice_uuid) self._add_record(record_uuid, (topology_id, slice_)) - async def commit(self) -> None: + def commit(self) -> None: for dlt_record_uuid in self.dlt_record_uuids: topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] if isinstance(dlt_record, Device): @@ -73,7 +73,7 @@ class DltRecordSender: dlt_device_id = DltDeviceId() dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member - await self.dlt_connector_client.RecordDevice(dlt_device_id) + self.dlt_connector_client.RecordDevice(dlt_device_id) elif isinstance(dlt_record, Link): #link_id = self.context_client.SetLink(dlt_record) link_id = dlt_record.link_id @@ -81,7 +81,7 @@ class DltRecordSender: dlt_link_id = DltLinkId() dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member - await self.dlt_connector_client.RecordLink(dlt_link_id) + self.dlt_connector_client.RecordLink(dlt_link_id) elif isinstance(dlt_record, Service): #service_id = self.context_client.SetService(dlt_record) service_id = dlt_record.service_id @@ -89,7 +89,7 @@ class DltRecordSender: dlt_service_id = DltServiceId() dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member - await self.dlt_connector_client.RecordService(dlt_service_id) + self.dlt_connector_client.RecordService(dlt_service_id) elif isinstance(dlt_record, Slice): #slice_id = self.context_client.SetSlice(dlt_record) slice_id = dlt_record.slice_id @@ -97,6 +97,6 @@ class DltRecordSender: dlt_slice_id = DltSliceId() dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member - await self.dlt_connector_client.RecordSlice(dlt_slice_id) + self.dlt_connector_client.RecordSlice(dlt_slice_id) else: LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record))) diff --git a/src/interdomain/service/topology_abstractor/DltRecorder copy.py b/src/interdomain/service/topology_abstractor/DltRecorder copy.py deleted file mode 100644 index 0e94159c4..000000000 --- a/src/interdomain/service/topology_abstractor/DltRecorder copy.py +++ /dev/null @@ -1,166 +0,0 @@ -import logging -import threading -from typing import Dict, Optional - -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name -from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent -from common.tools.context_queries.Context import create_context -from common.tools.context_queries.Device import get_uuids_of_devices_in_topology -from common.tools.context_queries.Topology import create_missing_topologies -from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Device import json_device_id -from common.tools.object_factory.Topology import json_topology_id -from context.client.ContextClient import ContextClient -from context.client.EventsCollector import EventsCollector -from dlt.connector.client.DltConnectorClient import DltConnectorClient -from .DltRecordSender import DltRecordSender -from .Types import EventTypes - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - -ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) -INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) - - -class DLTRecorder(threading.Thread): - def __init__(self) -> None: - super().__init__(daemon=True) - self.terminate = threading.Event() - self.context_client = ContextClient() - self.context_event_collector = EventsCollector(self.context_client) - self.topology_cache: Dict[str, TopologyId] = {} - - def stop(self): - self.terminate.set() - - def run(self) -> None: - self.context_client.connect() - create_context(self.context_client, DEFAULT_CONTEXT_NAME) - self.create_topologies() - self.context_event_collector.start() - - while not self.terminate.is_set(): - event = self.context_event_collector.get_event(timeout=0.1) - if event is None: - continue - LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - self.update_record(event) - - self.context_event_collector.stop() - self.context_client.close() - - def create_topologies(self): - topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] - create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) - - def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: - # Always enable DLT for testing - dlt_connector_client = DltConnectorClient() - dlt_connector_client.connect() - return dlt_connector_client - # env_vars = find_environment_variables([ - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - # ]) - # if len(env_vars) == 2: - # dlt_connector_client = DltConnectorClient() - # dlt_connector_client.connect() - # return dlt_connector_client - # return None - - def update_record(self, event: EventTypes) -> None: - dlt_connector_client = self.get_dlt_connector_client() - dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) - - if isinstance(event, ContextEvent): - LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) - LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) - - elif isinstance(event, TopologyEvent): - LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_topology_event(event, dlt_record_sender) - - elif isinstance(event, DeviceEvent): - LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_device_event(event, dlt_record_sender) - - elif isinstance(event, LinkEvent): - LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_link_event(event, dlt_record_sender) - - else: - LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - - dlt_record_sender.commit() - if dlt_connector_client is not None: - dlt_connector_client.close() - - def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: - topology_id = event.topology_id - topology_uuid = topology_id.topology_uuid.uuid - context_id = topology_id.context_id - context_uuid = context_id.context_uuid.uuid - topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} - - context = self.context_client.GetContext(context_id) - context_name = context.name - - topology_details = self.context_client.GetTopologyDetails(topology_id) - topology_name = topology_details.name - - self.topology_cache[topology_uuid] = topology_id - - LOGGER.debug('TOPOLOGY Details({:s})'.format(grpc_message_to_json_string(topology_details))) - - if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ - (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): - for device in topology_details.devices: - LOGGER.debug('DEVICE_INFO_TOPO({:s})'.format(grpc_message_to_json_string(device))) - dlt_record_sender.add_device(topology_id, device) - - for link in topology_details.links: - dlt_record_sender.add_link(topology_id, link) - - else: - MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' - args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) - LOGGER.warning(MSG.format(*args)) - - def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: - for topology_uuid, topology_id in self.topology_cache.items(): - details = self.context_client.GetTopologyDetails(topology_id) - for device in details.devices: - if device.device_id == device_id: - return topology_id - return None - - def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: - for topology_uuid, topology_id in self.topology_cache.items(): - details = self.context_client.GetTopologyDetails(topology_id) - for link in details.links: - if link.link_id == link_id: - return topology_id - return None - - def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: - device_id = event.device_id - device = self.context_client.GetDevice(device_id) - topology_id = self.find_topology_for_device(device_id) - if topology_id: - LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) - - dlt_record_sender.add_device(topology_id, device) - else: - LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") - - def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: - link_id = event.link_id - link = self.context_client.GetLink(link_id) - topology_id = self.find_topology_for_link(link_id) - if topology_id: - dlt_record_sender.add_link(topology_id, link) - else: - LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}") diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index 9ed1aa750..0e94159c4 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -1,12 +1,10 @@ -# DltRecorder.py - import logging -import asyncio +import threading from typing import Dict, Optional from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name -from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceId, DeviceEvent, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent +from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent from common.tools.context_queries.Context import create_context from common.tools.context_queries.Device import get_uuids_of_devices_in_topology from common.tools.context_queries.Topology import create_missing_topologies @@ -27,47 +25,54 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) -class DLTRecorder: +class DLTRecorder(threading.Thread): def __init__(self) -> None: - self.terminate_event = asyncio.Event() + super().__init__(daemon=True) + self.terminate = threading.Event() self.context_client = ContextClient() self.context_event_collector = EventsCollector(self.context_client) self.topology_cache: Dict[str, TopologyId] = {} - async def stop(self): - self.terminate_event.set() - - async def start(self) -> None: - await self.run() + def stop(self): + self.terminate.set() - async def run(self) -> None: - await self.context_client.connect() + def run(self) -> None: + self.context_client.connect() create_context(self.context_client, DEFAULT_CONTEXT_NAME) self.create_topologies() - await self.context_event_collector.start() + self.context_event_collector.start() - while not self.terminate_event.is_set(): - event = await self.context_event_collector.get_event(timeout=0.1) + while not self.terminate.is_set(): + event = self.context_event_collector.get_event(timeout=0.1) if event is None: continue LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - await self.update_record(event) + self.update_record(event) - await self.context_event_collector.stop() - await self.context_client.close() + self.context_event_collector.stop() + self.context_client.close() def create_topologies(self): topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) - async def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: + def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: # Always enable DLT for testing dlt_connector_client = DltConnectorClient() - await dlt_connector_client.connect() + dlt_connector_client.connect() return dlt_connector_client - - async def update_record(self, event: EventTypes) -> None: - dlt_connector_client = await self.get_dlt_connector_client() + # env_vars = find_environment_variables([ + # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), + # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # ]) + # if len(env_vars) == 2: + # dlt_connector_client = DltConnectorClient() + # dlt_connector_client.connect() + # return dlt_connector_client + # return None + + def update_record(self, event: EventTypes) -> None: + dlt_connector_client = self.get_dlt_connector_client() dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) if isinstance(event, ContextEvent): @@ -76,34 +81,34 @@ class DLTRecorder: elif isinstance(event, TopologyEvent): LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) - await self.process_topology_event(event, dlt_record_sender) + self.process_topology_event(event, dlt_record_sender) elif isinstance(event, DeviceEvent): LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - await self.process_device_event(event, dlt_record_sender) + self.process_device_event(event, dlt_record_sender) elif isinstance(event, LinkEvent): LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - await self.process_link_event(event, dlt_record_sender) + self.process_link_event(event, dlt_record_sender) else: LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - await dlt_record_sender.commit() + dlt_record_sender.commit() if dlt_connector_client is not None: - await dlt_connector_client.close() + dlt_connector_client.close() - async def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: topology_id = event.topology_id topology_uuid = topology_id.topology_uuid.uuid context_id = topology_id.context_id context_uuid = context_id.context_uuid.uuid topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} - context = await self.context_client.GetContext(context_id) + context = self.context_client.GetContext(context_id) context_name = context.name - topology_details = await self.context_client.GetTopologyDetails(topology_id) + topology_details = self.context_client.GetTopologyDetails(topology_id) topology_name = topology_details.name self.topology_cache[topology_uuid] = topology_id @@ -124,26 +129,26 @@ class DLTRecorder: args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) LOGGER.warning(MSG.format(*args)) - async def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: + def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): - details = await self.context_client.GetTopologyDetails(topology_id) + details = self.context_client.GetTopologyDetails(topology_id) for device in details.devices: if device.device_id == device_id: return topology_id return None - async def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: + def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): - details = await self.context_client.GetTopologyDetails(topology_id) + details = self.context_client.GetTopologyDetails(topology_id) for link in details.links: if link.link_id == link_id: return topology_id return None - async def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: device_id = event.device_id - device = await self.context_client.GetDevice(device_id) - topology_id = await self.find_topology_for_device(device_id) + device = self.context_client.GetDevice(device_id) + topology_id = self.find_topology_for_device(device_id) if topology_id: LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) @@ -151,10 +156,10 @@ class DLTRecorder: else: LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") - async def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: + def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: link_id = event.link_id - link = await self.context_client.GetLink(link_id) - topology_id = await self.find_topology_for_link(link_id) + link = self.context_client.GetLink(link_id) + topology_id = self.find_topology_for_link(link_id) if topology_id: dlt_record_sender.add_link(topology_id, link) else: -- GitLab