import logging, sys, threading, time from queue import Queue, Empty from typing import Dict, Iterator, NamedTuple, Optional, Set LOGGER = logging.getLogger(__name__) SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds CONSUME_TIMEOUT = 0.5 # seconds class Watchdog: def __init__(self, dead_timeout) -> None: self._lock = threading.Lock() self._dead_timeout = dead_timeout self._timestamp = time.time() def refresh(self): with self._lock: self._timestamp = time.time() def is_alive(self): with self._lock: return (time.time() - self._timestamp) < self._dead_timeout class Message(NamedTuple): topic: str content: str class Subscriber(Watchdog): def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None: super().__init__(dead_timeout) self._consume_timeout = consume_timeout self._queue = Queue() def publish(self, message : Message) -> None: self._queue.put_nowait(message) def consume(self) -> Optional[Message]: self.refresh() try: return self._queue.get(block=True, timeout=self._consume_timeout) except Empty: return None class Topic: def __init__(self) -> None: self._subscribers : Set[Subscriber] = set() def add_subscriber(self, subscriber : Subscriber) -> None: self._subscribers.add(subscriber) def remove_subscriber(self, subscriber : Subscriber) -> None: self._subscribers.discard(subscriber) def publish(self, message : Message) -> None: dead_subscribers = set() for subscriber in self._subscribers: if subscriber.is_alive(): subscriber.publish(message) else: dead_subscribers.add(subscriber) for dead_subscriber in dead_subscribers: self.remove_subscriber(dead_subscriber) class MessageBroker: def __init__(self): self._terminate = threading.Event() self._topics : Dict[str, Topic] = {} def get_or_create_topic(self, topic_name) -> Topic: return self._topics.setdefault(topic_name, Topic()) def terminate(self) -> None: self._terminate.set() def publish(self, message : Message) -> None: self.get_or_create_topic(message.topic).publish(message) def consume( self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT ) -> Iterator[Message]: subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout) for topic_name in topic_names: self.get_or_create_topic(topic_name).add_subscriber(subscriber) while not self._terminate.is_set(): message = subscriber.consume() if message is None: continue yield message for topic_name in topic_names: self.get_or_create_topic(topic_name).remove_subscriber(subscriber) class Consumer(threading.Thread): def __init__( self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None: super().__init__(daemon=True) self._message_broker = message_broker self._topic_names = topic_names self._consume_timeout = consume_timeout self._dead_timeout = dead_timeout def run(self) -> None: print(time.time(), self.name, 'subscribes to', self._topic_names) messages = self._message_broker.consume( self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout) for message in messages: print(time.time(), self.name, 'receives', message) print(time.time(), self.name, 'terminates') TOPIC_DEVICES = 'devices' TOPIC_LINKS = 'links' TOPIC_SERVICES = 'services' def main(): message_broker = MessageBroker() consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS}) consumer1.start() consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES}) consumer2.start() consumer3 = Consumer(message_broker, {TOPIC_SERVICES}) consumer3.start() time.sleep(SUBSCRIBER_DEAD_TIMEOUT) message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01')) message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02')) message_broker.publish(Message(topic=TOPIC_LINKS, content='new-link-01-02')) time.sleep(SUBSCRIBER_DEAD_TIMEOUT) message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-01')) message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-02')) message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02')) message_broker.terminate() return 0 if __name__ == '__main__': sys.exit(main())