Skip to content
Snippets Groups Projects
MessageBroker.py 4.81 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, sys, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Optional, Set

LOGGER = logging.getLogger(__name__)

SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds
CONSUME_TIMEOUT = 0.5 # seconds

class Watchdog:
    def __init__(self, dead_timeout) -> None:
        self._lock = threading.Lock()
        self._dead_timeout = dead_timeout
        self._timestamp = time.time()

    def refresh(self):
        with self._lock:
            self._timestamp = time.time()

    def is_alive(self):
        with self._lock:
            return (time.time() - self._timestamp) < self._dead_timeout

class Message(NamedTuple):
    topic: str
    content: str

class Subscriber(Watchdog):
    def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
        super().__init__(dead_timeout)
        self._consume_timeout = consume_timeout
        self._queue = Queue()

    def publish(self, message : Message) -> None:
        self._queue.put_nowait(message)

    def consume(self) -> Optional[Message]:
        self.refresh()
        try:
            return self._queue.get(block=True, timeout=self._consume_timeout)
        except Empty:
            return None

class Topic:
    def __init__(self) -> None:
        self._subscribers : Set[Subscriber] = set()

    def add_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.add(subscriber)

    def remove_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.discard(subscriber)

    def publish(self, message : Message) -> None:
        dead_subscribers = set()
        for subscriber in self._subscribers:
            if subscriber.is_alive():
                subscriber.publish(message)
            else:
                dead_subscribers.add(subscriber)
        for dead_subscriber in dead_subscribers:
            self.remove_subscriber(dead_subscriber)

class MessageBroker:
    def __init__(self):
        self._terminate = threading.Event()
        self._topics : Dict[str, Topic] = {}

    def get_or_create_topic(self, topic_name) -> Topic:
        return self._topics.setdefault(topic_name, Topic())

    def terminate(self) -> None:
        self._terminate.set()

    def publish(self, message : Message) -> None:
        self.get_or_create_topic(message.topic).publish(message)

    def consume(
        self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT
        ) -> Iterator[Message]:

        subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout)
        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).add_subscriber(subscriber)

        while not self._terminate.is_set():
            message = subscriber.consume()
            if message is None: continue
            yield message

        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).remove_subscriber(subscriber)

class Consumer(threading.Thread):
    def __init__(
        self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT,
        dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:

        super().__init__(daemon=True)
        self._message_broker = message_broker
        self._topic_names = topic_names
        self._consume_timeout = consume_timeout
        self._dead_timeout = dead_timeout

    def run(self) -> None:
        print(time.time(), self.name, 'subscribes to', self._topic_names)
        messages = self._message_broker.consume(
            self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout)
        for message in messages:
            print(time.time(), self.name, 'receives', message)
        print(time.time(), self.name, 'terminates')

TOPIC_DEVICES  = 'devices'
TOPIC_LINKS    = 'links'
TOPIC_SERVICES = 'services'

def main():
    message_broker = MessageBroker()

    consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS})
    consumer1.start()

    consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES})
    consumer2.start()

    consumer3 = Consumer(message_broker, {TOPIC_SERVICES})
    consumer3.start()

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02'))
    message_broker.publish(Message(topic=TOPIC_LINKS,   content='new-link-01-02'))

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-02'))
    message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02'))

    message_broker.terminate()
    return 0

if __name__ == '__main__':
    sys.exit(main())