Loading src/common/message_broker/backend/nats/NatsBackendThread.py +13 −10 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio, logging, nats, nats.errors, queue, threading import asyncio, logging, nats, nats.errors, queue, threading, uuid from typing import List from common.message_broker.Message import Message Loading Loading @@ -49,16 +49,17 @@ class NatsBackendThread(threading.Thread): self._publish_queue.put_nowait(Message(topic_name, message_content)) async def _run_subscriber( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event self, str_uuid : str, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: try: LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) LOGGER.info('[_run_subscriber][uuid={:s}] NATS URI: {:s}'.format(str_uuid, str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) server_version = client.connected_server_version LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version)))) LOGGER.info('[_run_subscriber][uuid={:s}] Connected! NATS Server version: {:s}'.format( str_uuid, str(repr(server_version)))) subscription = await client.subscribe(topic_name) LOGGER.info('[_run_subscriber] Subscribed!') LOGGER.info('[_run_subscriber][uuid={:s}] Subscribed!'.format(str_uuid)) ready_event.set() while not self._terminate.is_set() and not unsubscribe.is_set(): try: Loading @@ -71,19 +72,21 @@ class NatsBackendThread(threading.Thread): await subscription.unsubscribe() await client.drain() except Exception: # pylint: disable=broad-exception-caught LOGGER.exception('[_run_subscriber] Unhandled Exception') LOGGER.exception('[_run_subscriber][uuid={:s}] Unhandled Exception'.format(str_uuid)) def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event ) -> None: ready_event = threading.Event() str_uuid = str(uuid.uuid4()) LOGGER.info('[subscribe][uuid={:s}] Creating subscriber task...'.format(str_uuid)) task = self._event_loop.create_task( self._run_subscriber(topic_name, timeout, out_queue, unsubscribe, ready_event) self._run_subscriber(str_uuid, topic_name, timeout, out_queue, unsubscribe, ready_event) ) self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') LOGGER.info('[subscribe][uuid={:s}] Waiting for subscriber to be ready...'.format(str_uuid)) is_ready = ready_event.wait(timeout=120) LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready))) LOGGER.info('[subscribe][uuid={:s}] Subscriber is Ready? {:s}'.format(str_uuid, str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop) Loading Loading
src/common/message_broker/backend/nats/NatsBackendThread.py +13 −10 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio, logging, nats, nats.errors, queue, threading import asyncio, logging, nats, nats.errors, queue, threading, uuid from typing import List from common.message_broker.Message import Message Loading Loading @@ -49,16 +49,17 @@ class NatsBackendThread(threading.Thread): self._publish_queue.put_nowait(Message(topic_name, message_content)) async def _run_subscriber( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event self, str_uuid : str, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: try: LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) LOGGER.info('[_run_subscriber][uuid={:s}] NATS URI: {:s}'.format(str_uuid, str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) server_version = client.connected_server_version LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version)))) LOGGER.info('[_run_subscriber][uuid={:s}] Connected! NATS Server version: {:s}'.format( str_uuid, str(repr(server_version)))) subscription = await client.subscribe(topic_name) LOGGER.info('[_run_subscriber] Subscribed!') LOGGER.info('[_run_subscriber][uuid={:s}] Subscribed!'.format(str_uuid)) ready_event.set() while not self._terminate.is_set() and not unsubscribe.is_set(): try: Loading @@ -71,19 +72,21 @@ class NatsBackendThread(threading.Thread): await subscription.unsubscribe() await client.drain() except Exception: # pylint: disable=broad-exception-caught LOGGER.exception('[_run_subscriber] Unhandled Exception') LOGGER.exception('[_run_subscriber][uuid={:s}] Unhandled Exception'.format(str_uuid)) def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event ) -> None: ready_event = threading.Event() str_uuid = str(uuid.uuid4()) LOGGER.info('[subscribe][uuid={:s}] Creating subscriber task...'.format(str_uuid)) task = self._event_loop.create_task( self._run_subscriber(topic_name, timeout, out_queue, unsubscribe, ready_event) self._run_subscriber(str_uuid, topic_name, timeout, out_queue, unsubscribe, ready_event) ) self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') LOGGER.info('[subscribe][uuid={:s}] Waiting for subscriber to be ready...'.format(str_uuid)) is_ready = ready_event.wait(timeout=120) LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready))) LOGGER.info('[subscribe][uuid={:s}] Subscriber is Ready? {:s}'.format(str_uuid, str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop) Loading