diff --git a/src/common/message_broker/backend/nats/NatsBackendThread.py b/src/common/message_broker/backend/nats/NatsBackendThread.py index b8a4fcd7bb6c40b1c7ff837b6b1a19523c115500..4085eeaac1ade954c37cb2acd74d95271b30adf4 100644 --- a/src/common/message_broker/backend/nats/NatsBackendThread.py +++ b/src/common/message_broker/backend/nats/NatsBackendThread.py @@ -52,22 +52,26 @@ class NatsBackendThread(threading.Thread): self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: - LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) - client = await nats.connect(servers=[self._nats_uri]) - LOGGER.info('[_run_subscriber] Connected!') - subscription = await client.subscribe(topic_name) - LOGGER.info('[_run_subscriber] Subscribed!') - ready_event.set() - while not self._terminate.is_set() and not unsubscribe.is_set(): - try: - message = await subscription.next_msg(timeout) - except nats.errors.TimeoutError: - continue - except asyncio.CancelledError: - break - out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) - await subscription.unsubscribe() - await client.drain() + try: + LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) + client = await nats.connect(servers=[self._nats_uri]) + server_version = client.connected_server_version + LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version)))) + subscription = await client.subscribe(topic_name) + LOGGER.info('[_run_subscriber] Subscribed!') + ready_event.set() + while not self._terminate.is_set() and not unsubscribe.is_set(): + try: + message = await subscription.next_msg(timeout) + except nats.errors.TimeoutError: + continue + except asyncio.CancelledError: + break + out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) + await subscription.unsubscribe() + await client.drain() + except Exception as e: + LOGGER.exception('[_run_subscriber] Unhandled Exception') def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event @@ -79,7 +83,7 @@ class NatsBackendThread(threading.Thread): self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') is_ready = ready_event.wait(timeout=120) - LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready))) + LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop)