Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import logging, sys, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Optional, Set
LOGGER = logging.getLogger(__name__)
SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds
CONSUME_TIMEOUT = 0.5 # seconds
class Watchdog:
def __init__(self, dead_timeout) -> None:
self._lock = threading.Lock()
self._dead_timeout = dead_timeout
self._timestamp = time.time()
def refresh(self):
with self._lock:
self._timestamp = time.time()
def is_alive(self):
with self._lock:
return (time.time() - self._timestamp) < self._dead_timeout
class Message(NamedTuple):
topic: str
content: str
class Subscriber(Watchdog):
def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
super().__init__(dead_timeout)
self._consume_timeout = consume_timeout
self._queue = Queue()
def publish(self, message : Message) -> None:
self._queue.put_nowait(message)
def consume(self) -> Optional[Message]:
self.refresh()
try:
return self._queue.get(block=True, timeout=self._consume_timeout)
except Empty:
return None
class Topic:
def __init__(self) -> None:
self._subscribers : Set[Subscriber] = set()
def add_subscriber(self, subscriber : Subscriber) -> None:
self._subscribers.add(subscriber)
def remove_subscriber(self, subscriber : Subscriber) -> None:
self._subscribers.discard(subscriber)
def publish(self, message : Message) -> None:
dead_subscribers = set()
for subscriber in self._subscribers:
if subscriber.is_alive():
subscriber.publish(message)
else:
dead_subscribers.add(subscriber)
for dead_subscriber in dead_subscribers:
self.remove_subscriber(dead_subscriber)
class MessageBroker:
def __init__(self):
self._terminate = threading.Event()
self._topics : Dict[str, Topic] = {}
def get_or_create_topic(self, topic_name) -> Topic:
return self._topics.setdefault(topic_name, Topic())
def terminate(self) -> None:
self._terminate.set()
def publish(self, message : Message) -> None:
self.get_or_create_topic(message.topic).publish(message)
def consume(
self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT
) -> Iterator[Message]:
subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout)
for topic_name in topic_names:
self.get_or_create_topic(topic_name).add_subscriber(subscriber)
while not self._terminate.is_set():
message = subscriber.consume()
if message is None: continue
yield message
for topic_name in topic_names:
self.get_or_create_topic(topic_name).remove_subscriber(subscriber)
class Consumer(threading.Thread):
def __init__(
self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT,
dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
super().__init__(daemon=True)
self._message_broker = message_broker
self._topic_names = topic_names
self._consume_timeout = consume_timeout
self._dead_timeout = dead_timeout
def run(self) -> None:
print(time.time(), self.name, 'subscribes to', self._topic_names)
messages = self._message_broker.consume(
self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout)
for message in messages:
print(time.time(), self.name, 'receives', message)
print(time.time(), self.name, 'terminates')
TOPIC_DEVICES = 'devices'
TOPIC_LINKS = 'links'
TOPIC_SERVICES = 'services'
def main():
message_broker = MessageBroker()
consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS})
consumer1.start()
consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES})
consumer2.start()
consumer3 = Consumer(message_broker, {TOPIC_SERVICES})
consumer3.start()
time.sleep(SUBSCRIBER_DEAD_TIMEOUT)
message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01'))
message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02'))
message_broker.publish(Message(topic=TOPIC_LINKS, content='new-link-01-02'))
time.sleep(SUBSCRIBER_DEAD_TIMEOUT)
message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-01'))
message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-02'))
message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02'))
message_broker.terminate()
return 0
if __name__ == '__main__':
sys.exit(main())