Skip to content
Snippets Groups Projects
Commit 978d52c6 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Message Broker - NATS:

- Improved subscriber error handling and logging
parent 907df502
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!147Resolve "Notificatoins from Context are incomplete"
...@@ -52,22 +52,26 @@ class NatsBackendThread(threading.Thread): ...@@ -52,22 +52,26 @@ class NatsBackendThread(threading.Thread):
self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event,
ready_event : threading.Event ready_event : threading.Event
) -> None: ) -> None:
LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) try:
client = await nats.connect(servers=[self._nats_uri]) LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri)))
LOGGER.info('[_run_subscriber] Connected!') client = await nats.connect(servers=[self._nats_uri])
subscription = await client.subscribe(topic_name) server_version = client.connected_server_version
LOGGER.info('[_run_subscriber] Subscribed!') LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version))))
ready_event.set() subscription = await client.subscribe(topic_name)
while not self._terminate.is_set() and not unsubscribe.is_set(): LOGGER.info('[_run_subscriber] Subscribed!')
try: ready_event.set()
message = await subscription.next_msg(timeout) while not self._terminate.is_set() and not unsubscribe.is_set():
except nats.errors.TimeoutError: try:
continue message = await subscription.next_msg(timeout)
except asyncio.CancelledError: except nats.errors.TimeoutError:
break continue
out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) except asyncio.CancelledError:
await subscription.unsubscribe() break
await client.drain() out_queue.put(Message(message.subject, message.data.decode('UTF-8')))
await subscription.unsubscribe()
await client.drain()
except Exception as e:
LOGGER.exception('[_run_subscriber] Unhandled Exception')
def subscribe( def subscribe(
self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
...@@ -79,7 +83,7 @@ class NatsBackendThread(threading.Thread): ...@@ -79,7 +83,7 @@ class NatsBackendThread(threading.Thread):
self._tasks.append(task) self._tasks.append(task)
LOGGER.info('[subscribe] Waiting for subscriber to be ready...') LOGGER.info('[subscribe] Waiting for subscriber to be ready...')
is_ready = ready_event.wait(timeout=120) is_ready = ready_event.wait(timeout=120)
LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready))) LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready)))
def run(self) -> None: def run(self) -> None:
asyncio.set_event_loop(self._event_loop) asyncio.set_event_loop(self._event_loop)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment