MessageBroker.py 4.81 KB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, sys, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Optional, Set

LOGGER = logging.getLogger(__name__)

SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds
CONSUME_TIMEOUT = 0.5 # seconds

class Watchdog:
    def __init__(self, dead_timeout) -> None:
        self._lock = threading.Lock()
        self._dead_timeout = dead_timeout
        self._timestamp = time.time()

    def refresh(self):
        with self._lock:
            self._timestamp = time.time()

    def is_alive(self):
        with self._lock:
            return (time.time() - self._timestamp) < self._dead_timeout

class Message(NamedTuple):
    topic: str
    content: str

class Subscriber(Watchdog):
    def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
        super().__init__(dead_timeout)
        self._consume_timeout = consume_timeout
        self._queue = Queue()

    def publish(self, message : Message) -> None:
        self._queue.put_nowait(message)

    def consume(self) -> Optional[Message]:
        self.refresh()
        try:
            return self._queue.get(block=True, timeout=self._consume_timeout)
        except Empty:
            return None

class Topic:
    def __init__(self) -> None:
        self._subscribers : Set[Subscriber] = set()

    def add_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.add(subscriber)

    def remove_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.discard(subscriber)

    def publish(self, message : Message) -> None:
        dead_subscribers = set()
        for subscriber in self._subscribers:
            if subscriber.is_alive():
                subscriber.publish(message)
            else:
                dead_subscribers.add(subscriber)
        for dead_subscriber in dead_subscribers:
            self.remove_subscriber(dead_subscriber)

class MessageBroker:
    def __init__(self):
        self._terminate = threading.Event()
        self._topics : Dict[str, Topic] = {}

    def get_or_create_topic(self, topic_name) -> Topic:
        return self._topics.setdefault(topic_name, Topic())

    def terminate(self) -> None:
        self._terminate.set()

    def publish(self, message : Message) -> None:
        self.get_or_create_topic(message.topic).publish(message)

    def consume(
        self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT
        ) -> Iterator[Message]:

        subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout)
        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).add_subscriber(subscriber)

        while not self._terminate.is_set():
            message = subscriber.consume()
            if message is None: continue
            yield message

        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).remove_subscriber(subscriber)

class Consumer(threading.Thread):
    def __init__(
        self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT,
        dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:

        super().__init__(daemon=True)
        self._message_broker = message_broker
        self._topic_names = topic_names
        self._consume_timeout = consume_timeout
        self._dead_timeout = dead_timeout

    def run(self) -> None:
        print(time.time(), self.name, 'subscribes to', self._topic_names)
        messages = self._message_broker.consume(
            self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout)
        for message in messages:
            print(time.time(), self.name, 'receives', message)
        print(time.time(), self.name, 'terminates')

TOPIC_DEVICES  = 'devices'
TOPIC_LINKS    = 'links'
TOPIC_SERVICES = 'services'

def main():
    message_broker = MessageBroker()

    consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS})
    consumer1.start()

    consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES})
    consumer2.start()

    consumer3 = Consumer(message_broker, {TOPIC_SERVICES})
    consumer3.start()

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02'))
    message_broker.publish(Message(topic=TOPIC_LINKS,   content='new-link-01-02'))

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-02'))
    message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02'))

    message_broker.terminate()
    return 0

if __name__ == '__main__':
    sys.exit(main())