import logging, pytest, threading, time from typing import List, Set from common.message_broker.Factory import get_messagebroker_backend from common.message_broker.Message import Message from common.message_broker.MessageBroker import MessageBroker from common.message_broker.backend.BackendEnum import BackendEnum from common.message_broker.backend._Backend import _Backend logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) SCENARIOS = [ (BackendEnum.INMEMORY, {}), (BackendEnum.REDIS, { 'REDIS_SERVICE_HOST': '10.1.7.194', 'REDIS_SERVICE_PORT': 30283, 'REDIS_DATABASE_ID': 0, }), ] CONSUME_TIMEOUT = 0.1 # seconds TOPIC_DEVICES = 'devices' TOPIC_LINKS = 'links' TOPIC_SERVICES = 'services' class Consumer(threading.Thread): def __init__( self, message_broker : MessageBroker, # pylint: disable=redefined-outer-name topic_names : Set[str], output_list : List[Message], consume_timeout=CONSUME_TIMEOUT) -> None: super().__init__(daemon=True) self._message_broker = message_broker self._topic_names = topic_names self._output_list = output_list self._consume_timeout = consume_timeout def run(self) -> None: LOGGER.info('{:s} subscribes to topics {:s}'.format(self.name, str(self._topic_names))) for message in self._message_broker.consume(self._topic_names, consume_timeout=self._consume_timeout): LOGGER.info('{:s} receives {:s}'.format(self.name, str(message))) self._output_list.append(message) LOGGER.info('{:s} terminates') @pytest.fixture(scope='session', ids=[str(scenario[0].value) for scenario in SCENARIOS], params=SCENARIOS) def message_broker(request): backend,settings = request.param LOGGER.info('Running fixture with backend={:s}, settings={:s}...'.format(str(backend), str(settings))) return MessageBroker(get_messagebroker_backend(backend=backend, **settings)) def test_messagebroker_instantiation(): with pytest.raises(AttributeError) as e: MessageBroker(None) str_class_path = '{}.{}'.format(_Backend.__module__, _Backend.__name__) assert str(e.value) == 'backend must inherit from {}'.format(str_class_path) assert MessageBroker(get_messagebroker_backend(BackendEnum.INMEMORY)) is not None def test_messagebroker(message_broker : MessageBroker): # pylint: disable=redefined-outer-name output_list1 : List[Message] = [] consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS}, output_list1) consumer1.start() output_list2 : List[Message] = [] consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES}, output_list2) consumer2.start() output_list3 : List[Message] = [] consumer3 = Consumer(message_broker, {TOPIC_SERVICES}, output_list3) consumer3.start() LOGGER.info('delay') time.sleep(0.5) message = Message(topic=TOPIC_DEVICES, content='new-device-01') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) message = Message(topic=TOPIC_DEVICES, content='new-device-02') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) message = Message(topic=TOPIC_LINKS, content='new-link-01-02') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) LOGGER.info('delay') time.sleep(0.1) message = Message(topic=TOPIC_DEVICES, content='update-device-01') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) message = Message(topic=TOPIC_DEVICES, content='update-device-02') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) message = Message(topic=TOPIC_SERVICES, content='new-service-01-02') LOGGER.info('publish message={:s}'.format(str(message))) message_broker.publish(message) LOGGER.info('delay') time.sleep(0.5) LOGGER.info('terminate') message_broker.terminate() LOGGER.info('join') consumer1.join() consumer2.join() consumer3.join() LOGGER.info('output_list1={:s}'.format(str(output_list1))) LOGGER.info('output_list2={:s}'.format(str(output_list2))) LOGGER.info('output_list3={:s}'.format(str(output_list3))) assert len(output_list1) == 5 assert output_list1[0].topic == TOPIC_DEVICES assert output_list1[0].content == 'new-device-01' assert output_list1[1].topic == TOPIC_DEVICES assert output_list1[1].content == 'new-device-02' assert output_list1[2].topic == TOPIC_LINKS assert output_list1[2].content == 'new-link-01-02' assert output_list1[3].topic == TOPIC_DEVICES assert output_list1[3].content == 'update-device-01' assert output_list1[4].topic == TOPIC_DEVICES assert output_list1[4].content == 'update-device-02' assert len(output_list2) == 5 assert output_list2[0].topic == TOPIC_DEVICES assert output_list2[0].content == 'new-device-01' assert output_list2[1].topic == TOPIC_DEVICES assert output_list2[1].content == 'new-device-02' assert output_list2[2].topic == TOPIC_DEVICES assert output_list2[2].content == 'update-device-01' assert output_list2[3].topic == TOPIC_DEVICES assert output_list2[3].content == 'update-device-02' assert output_list2[4].topic == TOPIC_SERVICES assert output_list2[4].content == 'new-service-01-02' assert len(output_list3) == 1 assert output_list3[0].topic == TOPIC_SERVICES assert output_list3[0].content == 'new-service-01-02'