NatsBackendThread.py 3.51 KB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import asyncio, logging, nats, nats.errors, queue, threading
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import List
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.message_broker.Message import Message

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class NatsBackendThread(threading.Thread):
    def __init__(self, nats_uri : str) -> None:
        self._nats_uri = nats_uri
        self._event_loop = asyncio.get_event_loop()
        self._terminate = asyncio.Event()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._tasks_terminated = asyncio.Event()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._publish_queue = asyncio.Queue[Message]()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._tasks : List[asyncio.Task] = list()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        super().__init__()

    def terminate(self) -> None:
        self._terminate.set()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for task in self._tasks: task.cancel()
        self._tasks_terminated.set()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    async def _run_publisher(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_run_publisher] NATS URI: {:s}'.format(str(self._nats_uri)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        client = await nats.connect(servers=[self._nats_uri])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_run_publisher] Connected!')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        while not self._terminate.is_set():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            try:
                message : Message = await self._publish_queue.get()
            except asyncio.CancelledError:
                break
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            await client.publish(message.topic, message.content.encode('UTF-8'))
        await client.drain()

    def publish(self, topic_name : str, message_content : str) -> None:
        self._publish_queue.put_nowait(Message(topic_name, message_content))

    async def _run_subscriber(
        self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
    ) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        client = await nats.connect(servers=[self._nats_uri])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_run_subscriber] Connected!')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        subscription = await client.subscribe(topic_name)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[_run_subscriber] Subscribed!')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        while not self._terminate.is_set() and not unsubscribe.is_set():
            try:
                message = await subscription.next_msg(timeout)
            except nats.errors.TimeoutError:
                continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            except asyncio.CancelledError:
                break
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            out_queue.put(Message(message.subject, message.data.decode('UTF-8')))
        await subscription.unsubscribe()
        await client.drain()

    def subscribe(
        self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event
    ) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        task = self._event_loop.create_task(self._run_subscriber(topic_name, timeout, out_queue, unsubscribe))
        self._tasks.append(task)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def run(self) -> None:
        asyncio.set_event_loop(self._event_loop)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        task = self._event_loop.create_task(self._run_publisher())
        self._tasks.append(task)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._event_loop.run_until_complete(self._terminate.wait())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._tasks.remove(task)
        self._event_loop.run_until_complete(self._tasks_terminated.wait())