Commit ef3d2182 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Changes:

Common:
- Implemented Message Brokering framework supporting multiple backends
- Implemented InMemory Message Brokering Backend for testing purposes
- Implemented Redis Backend
- Implemented Message Brokering unit tests
- Minor improvements in ORM Framework
- Added Message Broker framework unit testing into automated local tests
parent e4a74ad1
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -16,7 +16,8 @@ rm -f $COVERAGEFILE

coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
    common/metrics/tests/test_unitary.py \
    common/orm/tests/test_unitary.py
    common/orm/tests/test_unitary.py \
    common/message_broker/tests/test_unitary.py

#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
#    centralizedcybersecurity/tests/test_unitary.py
+1 −0
Original line number Diff line number Diff line
CONSUME_TIMEOUT = 0.1 # seconds
+32 −0
Original line number Diff line number Diff line
import logging, os
from typing import Optional, Union
from .backend._Backend import _Backend
from .backend.BackendEnum import BackendEnum
from .backend.inmemory.InMemoryBackend import InMemoryBackend
from .backend.redis.RedisBackend import RedisBackend

LOGGER = logging.getLogger(__name__)

BACKENDS = {
    BackendEnum.INMEMORY.value: InMemoryBackend,
    BackendEnum.REDIS.value: RedisBackend,
    #BackendEnum.KAFKA.value: KafkaBackend,
    #BackendEnum.RABBITMQ.value: RabbitMQBackend,
    #BackendEnum.ZEROMQ.value: ZeroMQBackend,
}

DEFAULT_MB_BACKEND = BackendEnum.INMEMORY

def get_messagebroker_backend(backend : Optional[Union[str, BackendEnum]] = None, **settings) -> _Backend:
    # return an instance of MessageBroker initialized with selected backend.
    # The backend is selected using following criteria (first that is not None is selected):
    # 1. user selected by parameter (backend=...)
    # 2. environment variable MB_BACKEND
    # 3. default backend: INMEMORY
    if backend is None: backend = os.environ.get('MB_BACKEND', DEFAULT_MB_BACKEND)
    if backend is None: raise Exception('MessageBroker Backend not specified')
    if isinstance(backend, BackendEnum): backend = backend.value
    backend_class = BACKENDS.get(backend)
    if backend_class is None: raise Exception('Unsupported MessageBrokerBackend({:s})'.format(backend))
    LOGGER.info('Selected MessageBroker Backend: {:s}'.format(backend))
    return backend_class(**settings)
+5 −0
Original line number Diff line number Diff line
from typing import NamedTuple

class Message(NamedTuple):
    topic: str
    content: str
+18 −136
Original line number Diff line number Diff line
import logging, sys, threading, time
from queue import Queue, Empty
from typing import Dict, Iterator, NamedTuple, Optional, Set
import logging
from typing import Iterator, Set
from .backend._Backend import _Backend
from .Constants import CONSUME_TIMEOUT
from .Message import Message

LOGGER = logging.getLogger(__name__)

SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds
CONSUME_TIMEOUT = 0.5 # seconds

class Watchdog:
    def __init__(self, dead_timeout) -> None:
        self._lock = threading.Lock()
        self._dead_timeout = dead_timeout
        self._timestamp = time.time()

    def refresh(self):
        with self._lock:
            self._timestamp = time.time()

    def is_alive(self):
        with self._lock:
            return (time.time() - self._timestamp) < self._dead_timeout

class Message(NamedTuple):
    topic: str
    content: str

class Subscriber(Watchdog):
    def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:
        super().__init__(dead_timeout)
        self._consume_timeout = consume_timeout
        self._queue = Queue()

    def publish(self, message : Message) -> None:
        self._queue.put_nowait(message)

    def consume(self) -> Optional[Message]:
        self.refresh()
        try:
            return self._queue.get(block=True, timeout=self._consume_timeout)
        except Empty:
            return None

class Topic:
    def __init__(self) -> None:
        self._subscribers : Set[Subscriber] = set()

    def add_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.add(subscriber)

    def remove_subscriber(self, subscriber : Subscriber) -> None:
        self._subscribers.discard(subscriber)

    def publish(self, message : Message) -> None:
        dead_subscribers = set()
        for subscriber in self._subscribers:
            if subscriber.is_alive():
                subscriber.publish(message)
            else:
                dead_subscribers.add(subscriber)
        for dead_subscriber in dead_subscribers:
            self.remove_subscriber(dead_subscriber)

class MessageBroker:
    def __init__(self):
        self._terminate = threading.Event()
        self._topics : Dict[str, Topic] = {}
    def __init__(self, backend : _Backend):
        if not isinstance(backend, _Backend):
            str_class_path = '{}.{}'.format(_Backend.__module__, _Backend.__name__)
            raise AttributeError('backend must inherit from {}'.format(str_class_path))
        self._backend = backend

    def get_or_create_topic(self, topic_name) -> Topic:
        return self._topics.setdefault(topic_name, Topic())

    def terminate(self) -> None:
        self._terminate.set()
    @property
    def backend(self) -> _Backend: return self._backend

    def publish(self, message : Message) -> None:
        self.get_or_create_topic(message.topic).publish(message)

    def consume(
        self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT
        ) -> Iterator[Message]:

        subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout)
        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).add_subscriber(subscriber)

        while not self._terminate.is_set():
            message = subscriber.consume()
            if message is None: continue
            yield message

        for topic_name in topic_names:
            self.get_or_create_topic(topic_name).remove_subscriber(subscriber)

class Consumer(threading.Thread):
    def __init__(
        self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT,
        dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None:

        super().__init__(daemon=True)
        self._message_broker = message_broker
        self._topic_names = topic_names
        self._consume_timeout = consume_timeout
        self._dead_timeout = dead_timeout

    def run(self) -> None:
        print(time.time(), self.name, 'subscribes to', self._topic_names)
        messages = self._message_broker.consume(
            self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout)
        for message in messages:
            print(time.time(), self.name, 'receives', message)
        print(time.time(), self.name, 'terminates')

TOPIC_DEVICES  = 'devices'
TOPIC_LINKS    = 'links'
TOPIC_SERVICES = 'services'

def main():
    message_broker = MessageBroker()

    consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS})
    consumer1.start()

    consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES})
    consumer2.start()

    consumer3 = Consumer(message_broker, {TOPIC_SERVICES})
    consumer3.start()

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02'))
    message_broker.publish(Message(topic=TOPIC_LINKS,   content='new-link-01-02'))

    time.sleep(SUBSCRIBER_DEAD_TIMEOUT)

    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-01'))
    message_broker.publish(Message(topic=TOPIC_DEVICES,  content='update-device-02'))
    message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02'))
        self._backend.publish(message.topic, message.content)

    message_broker.terminate()
    return 0
    def consume(self, topic_names : Set[str], consume_timeout : float = CONSUME_TIMEOUT) -> Iterator[Message]:
        for pair in self._backend.consume(topic_names, consume_timeout=consume_timeout):
            yield Message(*pair)

if __name__ == '__main__':
    sys.exit(main())
    def terminate(self):
        self._backend.terminate()
Loading