Loading src/common/message_broker/backend/nats/NatsBackendThread.py +10 −2 Original line number Diff line number Diff line Loading @@ -49,13 +49,15 @@ class NatsBackendThread(threading.Thread): self._publish_queue.put_nowait(Message(topic_name, message_content)) async def _run_subscriber( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) LOGGER.info('[_run_subscriber] Connected!') subscription = await client.subscribe(topic_name) LOGGER.info('[_run_subscriber] Subscribed!') ready_event.set() while not self._terminate.is_set() and not unsubscribe.is_set(): try: message = await subscription.next_msg(timeout) Loading @@ -70,8 +72,14 @@ class NatsBackendThread(threading.Thread): def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event ) -> None: task = self._event_loop.create_task(self._run_subscriber(topic_name, timeout, out_queue, unsubscribe)) ready_event = threading.Event() task = self._event_loop.create_task( self._run_subscriber(topic_name, timeout, out_queue, unsubscribe, ready_event) ) self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') is_ready = ready_event.wait(timeout=120) LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop) Loading Loading
src/common/message_broker/backend/nats/NatsBackendThread.py +10 −2 Original line number Diff line number Diff line Loading @@ -49,13 +49,15 @@ class NatsBackendThread(threading.Thread): self._publish_queue.put_nowait(Message(topic_name, message_content)) async def _run_subscriber( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) LOGGER.info('[_run_subscriber] Connected!') subscription = await client.subscribe(topic_name) LOGGER.info('[_run_subscriber] Subscribed!') ready_event.set() while not self._terminate.is_set() and not unsubscribe.is_set(): try: message = await subscription.next_msg(timeout) Loading @@ -70,8 +72,14 @@ class NatsBackendThread(threading.Thread): def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event ) -> None: task = self._event_loop.create_task(self._run_subscriber(topic_name, timeout, out_queue, unsubscribe)) ready_event = threading.Event() task = self._event_loop.create_task( self._run_subscriber(topic_name, timeout, out_queue, unsubscribe, ready_event) ) self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') is_ready = ready_event.wait(timeout=120) LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop) Loading