diff --git a/src/dlt/connector/client/DltConnectorClient.py b/src/dlt/connector/client/DltConnectorClient.py index e71cf0bd7e7abe699fd6bcb81fcae015527de834..7cfb6b59443c751ffddb627cd3aba8a25584e191 100644 --- a/src/dlt/connector/client/DltConnectorClient.py +++ b/src/dlt/connector/client/DltConnectorClient.py @@ -39,7 +39,7 @@ class DltConnectorClient: LOGGER.debug('Channel created') def connect(self): - self.channel = grpc.insecure_channel(self.endpoint) + self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) def close(self): diff --git a/src/dlt/connector/client/DltEventsCollector copy.py b/src/dlt/connector/client/DltEventsCollector copy.py new file mode 100644 index 0000000000000000000000000000000000000000..9fac60b7cfb39c2594655a5ce35c879872005051 --- /dev/null +++ b/src/dlt/connector/client/DltEventsCollector copy.py @@ -0,0 +1,95 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable, Optional +import asyncio +import grpc, logging, queue, threading, time +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription +from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.client.DltGatewayClient import DltGatewayClient + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +# This class accepts an event_handler method as attribute that can be used to pre-process and +# filter events before they reach the events_queue. Depending on the handler, the supported +# behaviors are: +# - If the handler is not set, the events are transparently added to the events_queue. +# - If returns None for an event, the event is not stored in the events_queue. +# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue. +# - Other combinations are not supported. + +class DltEventsCollector(threading.Thread): + def __init__( + self, dltgateway_client : DltGatewayClient, + log_events_received : bool = False, + event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, + ) -> None: + super().__init__(name='DltEventsCollector', daemon=True) + self._dltgateway_client = dltgateway_client + self._log_events_received = log_events_received + self._event_handler = event_handler + self._events_queue = queue.Queue() + self._terminate = threading.Event() + self._dltgateway_stream = None + + def run(self) -> None: + event_handler = self._event_handler + if event_handler is None: event_handler = lambda e: e + self._terminate.clear() + while not self._terminate.is_set(): + try: + subscription = DltRecordSubscription() # bu default subscribe to all + self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription) + for event in self._dltgateway_stream: + if self._log_events_received: + LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) + event = event_handler(event) + if event is None: continue + if not isinstance(event, DltRecordEvent): + # pylint: disable=broad-exception-raised + raise Exception('Unsupported return type: {:s}'.format(str(event))) + self._events_queue.put_nowait(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member + time.sleep(0.5) + continue + elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member + break + else: + raise # pragma: no cover + + def get_event(self, block : bool = True, timeout : float = 0.1): + try: + return self._events_queue.get(block=block, timeout=timeout) + except queue.Empty: # pylint: disable=catching-non-exception + return None + + def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): + events = [] + if count is None: + while True: + event = self.get_event(block=block, timeout=timeout) + if event is None: break + events.append(event) + else: + for _ in range(count): + event = self.get_event(block=block, timeout=timeout) + if event is None: continue + events.append(event) + return sorted(events, key=lambda e: e.event.timestamp.timestamp) + + def stop(self): + self._terminate.set() + if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py index e59784a4d2902459d7bc88925e5b83a698770012..39cf993f3e7fac22fa04d607ceb54f378121cc9d 100644 --- a/src/dlt/connector/client/DltEventsCollector.py +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -13,7 +13,9 @@ # limitations under the License. from typing import Callable, Optional -import grpc, logging, queue, threading, time +import asyncio +import grpc +import logging from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClient import DltGatewayClient @@ -21,74 +23,78 @@ from dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -# This class accepts an event_handler method as attribute that can be used to pre-process and -# filter events before they reach the events_queue. Depending on the handler, the supported -# behaviors are: -# - If the handler is not set, the events are transparently added to the events_queue. -# - If returns None for an event, the event is not stored in the events_queue. -# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue. -# - Other combinations are not supported. - -class DltEventsCollector(threading.Thread): +class DltEventsCollector: def __init__( - self, dltgateway_client : DltGatewayClient, - log_events_received : bool = False, - event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, + self, dltgateway_client: DltGatewayClient, + log_events_received: bool = False, + event_handler: Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: - super().__init__(name='DltEventsCollector', daemon=True) self._dltgateway_client = dltgateway_client self._log_events_received = log_events_received self._event_handler = event_handler - self._events_queue = queue.Queue() - self._terminate = threading.Event() + self._events_queue = asyncio.Queue() + self._terminate = asyncio.Event() self._dltgateway_stream = None - def run(self) -> None: + async def start(self) -> None: event_handler = self._event_handler - if event_handler is None: event_handler = lambda e: e + if event_handler is None: + event_handler = lambda e: e self._terminate.clear() while not self._terminate.is_set(): try: - subscription = DltRecordSubscription() # bu default subscribe to all - self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription) - for event in self._dltgateway_stream: + subscription = DltRecordSubscription() # by default subscribe to all + self._dltgateway_stream = await self._dltgateway_client.SubscribeToDlt(subscription) + async for event in self._dltgateway_stream: if self._log_events_received: LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) event = event_handler(event) - if event is None: continue + if event is None: + continue if not isinstance(event, DltRecordEvent): - # pylint: disable=broad-exception-raised raise Exception('Unsupported return type: {:s}'.format(str(event))) - self._events_queue.put_nowait(event) + await self._events_queue.put(event) except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member - time.sleep(0.5) + if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member + await asyncio.sleep(0.5) continue - elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member + elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member break else: - raise # pragma: no cover + raise # pragma: no cover - def get_event(self, block : bool = True, timeout : float = 0.1): + async def get_event(self, block: bool = True, timeout: float = 0.1): try: - return self._events_queue.get(block=block, timeout=timeout) - except queue.Empty: # pylint: disable=catching-non-exception + return await asyncio.wait_for(self._events_queue.get(), timeout) + except asyncio.TimeoutError: return None - def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): + async def get_events(self, block: bool = True, timeout: float = 0.1, count: int = None): events = [] if count is None: while True: - event = self.get_event(block=block, timeout=timeout) - if event is None: break + event = await self.get_event(block=block, timeout=timeout) + if event is None: + break events.append(event) else: for _ in range(count): - event = self.get_event(block=block, timeout=timeout) - if event is None: continue + event = await self.get_event(block=block, timeout=timeout) + if event is None: + continue events.append(event) return sorted(events, key=lambda e: e.event.timestamp.timestamp) - def stop(self): + async def stop(self): self._terminate.set() - if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() + if self._dltgateway_stream is not None: + await self._dltgateway_stream.cancel() + +# Usage example +async def main(): + gateway_client = DltGatewayClient() + collector = DltEventsCollector(gateway_client) + await collector.start() + +# Start the event loop +asyncio.run(main()) diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index b04e08d7fd5f0b83a41f36a976376be83b603551..f9b37a2db848ba0c5a41f887d819ccf0489909e6 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -40,7 +40,7 @@ class DltGatewayClient: LOGGER.debug('Channel created') def connect(self): - self.channel = grpc.insecure_channel(self.endpoint) + self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltGatewayServiceStub(self.channel) def close(self): diff --git a/src/dlt/connector/client/async.py b/src/dlt/connector/client/async.py new file mode 100644 index 0000000000000000000000000000000000000000..e38f124c2bd2b7a1e6435c4fa15cf56c823eb7d2 --- /dev/null +++ b/src/dlt/connector/client/async.py @@ -0,0 +1,74 @@ +# DltGatewayClient.py + +import grpc +import logging +from typing import Iterator +from common.proto.context_pb2 import Empty, TeraFlowController +from common.proto.dlt_gateway_pb2 import ( + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription +) +from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltGatewayClient: + def __init__(self, host=None, port=None): + if not host: host = DLT_GATEWAY_HOST + if not port: port = DLT_GATEWAY_PORT + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.aio.insecure_channel(self.endpoint) + self.stub = DltGatewayServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus: + LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.RecordToDlt(request) + LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + async def GetFromDlt(self, request: DltRecordId) -> DltRecord: + LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.GetFromDlt(request) + LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + async def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: + LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.SubscribeToDlt(request) + LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + async def GetDltStatus(self, request: TeraFlowController) -> DltPeerStatus: + LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.GetDltStatus(request) + LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + async def GetDltPeers(self, request: Empty) -> DltPeerStatusList: + LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.GetDltPeers(request) + LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) + return response