diff --git a/run_local_tests.sh b/run_local_tests.sh index 04177cd009ffa574488ba74007a2c6d95e7220c1..eee3f6b0913da2bc579bcfc5f5ccafa744371d72 100755 --- a/run_local_tests.sh +++ b/run_local_tests.sh @@ -16,7 +16,8 @@ rm -f $COVERAGEFILE coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ common/metrics/tests/test_unitary.py \ - common/orm/tests/test_unitary.py + common/orm/tests/test_unitary.py \ + common/message_broker/tests/test_unitary.py #coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # centralizedcybersecurity/tests/test_unitary.py diff --git a/src/common/message_broker/Constants.py b/src/common/message_broker/Constants.py new file mode 100644 index 0000000000000000000000000000000000000000..671f11add119bf5103258af0c07df60e0e3afd0d --- /dev/null +++ b/src/common/message_broker/Constants.py @@ -0,0 +1 @@ +CONSUME_TIMEOUT = 0.1 # seconds diff --git a/src/common/message_broker/Factory.py b/src/common/message_broker/Factory.py new file mode 100644 index 0000000000000000000000000000000000000000..a2ea36435c717835bf4b7c89c2522878e67074c9 --- /dev/null +++ b/src/common/message_broker/Factory.py @@ -0,0 +1,32 @@ +import logging, os +from typing import Optional, Union +from .backend._Backend import _Backend +from .backend.BackendEnum import BackendEnum +from .backend.inmemory.InMemoryBackend import InMemoryBackend +from .backend.redis.RedisBackend import RedisBackend + +LOGGER = logging.getLogger(__name__) + +BACKENDS = { + BackendEnum.INMEMORY.value: InMemoryBackend, + BackendEnum.REDIS.value: RedisBackend, + #BackendEnum.KAFKA.value: KafkaBackend, + #BackendEnum.RABBITMQ.value: RabbitMQBackend, + #BackendEnum.ZEROMQ.value: ZeroMQBackend, +} + +DEFAULT_MB_BACKEND = BackendEnum.INMEMORY + +def get_messagebroker_backend(backend : Optional[Union[str, BackendEnum]] = None, **settings) -> _Backend: + # return an instance of MessageBroker initialized with selected backend. + # The backend is selected using following criteria (first that is not None is selected): + # 1. user selected by parameter (backend=...) + # 2. environment variable MB_BACKEND + # 3. default backend: INMEMORY + if backend is None: backend = os.environ.get('MB_BACKEND', DEFAULT_MB_BACKEND) + if backend is None: raise Exception('MessageBroker Backend not specified') + if isinstance(backend, BackendEnum): backend = backend.value + backend_class = BACKENDS.get(backend) + if backend_class is None: raise Exception('Unsupported MessageBrokerBackend({:s})'.format(backend)) + LOGGER.info('Selected MessageBroker Backend: {:s}'.format(backend)) + return backend_class(**settings) diff --git a/src/common/message_broker/Message.py b/src/common/message_broker/Message.py new file mode 100644 index 0000000000000000000000000000000000000000..ee527dc9f46855b3d806f9188a7d5640c34f416b --- /dev/null +++ b/src/common/message_broker/Message.py @@ -0,0 +1,5 @@ +from typing import NamedTuple + +class Message(NamedTuple): + topic: str + content: str diff --git a/src/common/message_broker/MessageBroker.py b/src/common/message_broker/MessageBroker.py index 4213cfe9f3c2e5b91271554a91f558ea27a4b39a..53697db2d0062a8290be44991990ee7f217e2c25 100644 --- a/src/common/message_broker/MessageBroker.py +++ b/src/common/message_broker/MessageBroker.py @@ -1,145 +1,27 @@ -import logging, sys, threading, time -from queue import Queue, Empty -from typing import Dict, Iterator, NamedTuple, Optional, Set +import logging +from typing import Iterator, Set +from .backend._Backend import _Backend +from .Constants import CONSUME_TIMEOUT +from .Message import Message LOGGER = logging.getLogger(__name__) -SUBSCRIBER_DEAD_TIMEOUT = 5.0 # seconds -CONSUME_TIMEOUT = 0.5 # seconds - -class Watchdog: - def __init__(self, dead_timeout) -> None: - self._lock = threading.Lock() - self._dead_timeout = dead_timeout - self._timestamp = time.time() - - def refresh(self): - with self._lock: - self._timestamp = time.time() - - def is_alive(self): - with self._lock: - return (time.time() - self._timestamp) < self._dead_timeout - -class Message(NamedTuple): - topic: str - content: str - -class Subscriber(Watchdog): - def __init__(self, consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None: - super().__init__(dead_timeout) - self._consume_timeout = consume_timeout - self._queue = Queue() - - def publish(self, message : Message) -> None: - self._queue.put_nowait(message) - - def consume(self) -> Optional[Message]: - self.refresh() - try: - return self._queue.get(block=True, timeout=self._consume_timeout) - except Empty: - return None - -class Topic: - def __init__(self) -> None: - self._subscribers : Set[Subscriber] = set() - - def add_subscriber(self, subscriber : Subscriber) -> None: - self._subscribers.add(subscriber) - - def remove_subscriber(self, subscriber : Subscriber) -> None: - self._subscribers.discard(subscriber) - - def publish(self, message : Message) -> None: - dead_subscribers = set() - for subscriber in self._subscribers: - if subscriber.is_alive(): - subscriber.publish(message) - else: - dead_subscribers.add(subscriber) - for dead_subscriber in dead_subscribers: - self.remove_subscriber(dead_subscriber) - class MessageBroker: - def __init__(self): - self._terminate = threading.Event() - self._topics : Dict[str, Topic] = {} + def __init__(self, backend : _Backend): + if not isinstance(backend, _Backend): + str_class_path = '{}.{}'.format(_Backend.__module__, _Backend.__name__) + raise AttributeError('backend must inherit from {}'.format(str_class_path)) + self._backend = backend - def get_or_create_topic(self, topic_name) -> Topic: - return self._topics.setdefault(topic_name, Topic()) - - def terminate(self) -> None: - self._terminate.set() + @property + def backend(self) -> _Backend: return self._backend def publish(self, message : Message) -> None: - self.get_or_create_topic(message.topic).publish(message) - - def consume( - self, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, dead_timeout=SUBSCRIBER_DEAD_TIMEOUT - ) -> Iterator[Message]: - - subscriber = Subscriber(consume_timeout=consume_timeout, dead_timeout=dead_timeout) - for topic_name in topic_names: - self.get_or_create_topic(topic_name).add_subscriber(subscriber) - - while not self._terminate.is_set(): - message = subscriber.consume() - if message is None: continue - yield message - - for topic_name in topic_names: - self.get_or_create_topic(topic_name).remove_subscriber(subscriber) - -class Consumer(threading.Thread): - def __init__( - self, message_broker : MessageBroker, topic_names : Set[str], consume_timeout=CONSUME_TIMEOUT, - dead_timeout=SUBSCRIBER_DEAD_TIMEOUT) -> None: - - super().__init__(daemon=True) - self._message_broker = message_broker - self._topic_names = topic_names - self._consume_timeout = consume_timeout - self._dead_timeout = dead_timeout - - def run(self) -> None: - print(time.time(), self.name, 'subscribes to', self._topic_names) - messages = self._message_broker.consume( - self._topic_names, consume_timeout=self._consume_timeout, dead_timeout=self._dead_timeout) - for message in messages: - print(time.time(), self.name, 'receives', message) - print(time.time(), self.name, 'terminates') - -TOPIC_DEVICES = 'devices' -TOPIC_LINKS = 'links' -TOPIC_SERVICES = 'services' - -def main(): - message_broker = MessageBroker() - - consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS}) - consumer1.start() - - consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES}) - consumer2.start() - - consumer3 = Consumer(message_broker, {TOPIC_SERVICES}) - consumer3.start() - - time.sleep(SUBSCRIBER_DEAD_TIMEOUT) - - message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-01')) - message_broker.publish(Message(topic=TOPIC_DEVICES, content='new-device-02')) - message_broker.publish(Message(topic=TOPIC_LINKS, content='new-link-01-02')) - - time.sleep(SUBSCRIBER_DEAD_TIMEOUT) - - message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-01')) - message_broker.publish(Message(topic=TOPIC_DEVICES, content='update-device-02')) - message_broker.publish(Message(topic=TOPIC_SERVICES, content='new-service-01-02')) + self._backend.publish(message.topic, message.content) - message_broker.terminate() - return 0 + def consume(self, topic_names : Set[str], consume_timeout : float = CONSUME_TIMEOUT) -> Iterator[Message]: + for pair in self._backend.consume(topic_names, consume_timeout=consume_timeout): + yield Message(*pair) -if __name__ == '__main__': - sys.exit(main()) + def terminate(self): + self._backend.terminate() diff --git a/src/common/message_broker/backend/BackendEnum.py b/src/common/message_broker/backend/BackendEnum.py new file mode 100644 index 0000000000000000000000000000000000000000..9ee482477e489fff25ed0538330e33d378cbd251 --- /dev/null +++ b/src/common/message_broker/backend/BackendEnum.py @@ -0,0 +1,8 @@ +from enum import Enum + +class BackendEnum(Enum): + INMEMORY = 'inmemory' + REDIS = 'redis' + #KAFKA = 'kafka' + #RABBITMQ = 'rabbitmq' + #ZEROMQ = 'zeromq' diff --git a/src/common/message_broker/backend/_Backend.py b/src/common/message_broker/backend/_Backend.py new file mode 100644 index 0000000000000000000000000000000000000000..a6461d2538a1794e6bdec597ac0ad611eec6d484 --- /dev/null +++ b/src/common/message_broker/backend/_Backend.py @@ -0,0 +1,14 @@ +from typing import Iterator, Set, Tuple + +class _Backend: + def __init__(self, **settings) -> None: + raise NotImplementedError() + + def terminate(self) -> None: + raise NotImplementedError() + + def publish(self, topic_name : str, message_content : str) -> None: + raise NotImplementedError() + + def consume(self, topic_names : Set[str], consume_timeout : float) -> Iterator[Tuple[str, str]]: + raise NotImplementedError() diff --git a/src/common/message_broker/backend/__init__.py b/src/common/message_broker/backend/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/common/message_broker/backend/inmemory/InMemoryBackend.py b/src/common/message_broker/backend/inmemory/InMemoryBackend.py new file mode 100644 index 0000000000000000000000000000000000000000..9b0e758724deb280f9cc8a1ba3eb20e6f4c57356 --- /dev/null +++ b/src/common/message_broker/backend/inmemory/InMemoryBackend.py @@ -0,0 +1,41 @@ +# InMemeory MessageBroker Backend +# ------------------------------- +# - WARNING: DESIGNED AND BUILT FOR UNIT TESTING AND INTEGRATION TESTING PURPOSES ONLY !!! +# USE ANOTHER BACKEND IN PRODUCTION ENVIRONMENTS. + +import logging, threading +from queue import Queue, Empty +from typing import Dict, Iterator, Set, Tuple +from .._Backend import _Backend + +LOGGER = logging.getLogger(__name__) + +class InMemoryBackend(_Backend): + def __init__(self, **settings) -> None: # pylint: disable=super-init-not-called + self._lock = threading.Lock() + self._terminate = threading.Event() + self._topic__to__queues : Dict[str, Set[Queue]] = {} + + def terminate(self) -> None: + self._terminate.set() + + def publish(self, topic_name : str, message_content : str) -> None: + queues = self._topic__to__queues.get(topic_name, None) + if queues is None: return + for queue in queues: queue.put_nowait((topic_name, message_content)) + + def consume(self, topic_names : Set[str], consume_timeout : float) -> Iterator[Tuple[str, str]]: + queue = Queue() + for topic_name in topic_names: + self._topic__to__queues.setdefault(topic_name, set()).add(queue) + + while not self._terminate.is_set(): + try: + message = queue.get(block=True, timeout=consume_timeout) + except Empty: + continue + if message is None: continue + yield message + + for topic_name in topic_names: + self._topic__to__queues.get(topic_name, set()).discard(queue) diff --git a/src/common/message_broker/backend/inmemory/__init__.py b/src/common/message_broker/backend/inmemory/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/common/message_broker/backend/redis/RedisBackend.py b/src/common/message_broker/backend/redis/RedisBackend.py new file mode 100644 index 0000000000000000000000000000000000000000..0e8be0f30d77dcf6db5dbebcacba06591d1dbdff --- /dev/null +++ b/src/common/message_broker/backend/redis/RedisBackend.py @@ -0,0 +1,44 @@ +import os, threading +from typing import Any, Dict, Iterator, Set, Tuple +from redis.client import Redis + +from common.message_broker.Message import Message +from .._Backend import _Backend + +DEFAULT_SERVICE_HOST = '127.0.0.1' +DEFAULT_SERVICE_PORT = 6379 +DEFAULT_DATABASE_ID = 0 + +def get_setting(settings : Dict[str, Any], name : str, default : Any) -> Any: + value = settings.get(name, os.environ.get(name)) + return default if value is None else value + +class RedisBackend(_Backend): + def __init__(self, **settings) -> None: # pylint: disable=super-init-not-called + host = get_setting(settings, 'REDIS_SERVICE_HOST', DEFAULT_SERVICE_HOST) + port = get_setting(settings, 'REDIS_SERVICE_PORT', DEFAULT_SERVICE_PORT) + dbid = get_setting(settings, 'REDIS_DATABASE_ID', DEFAULT_DATABASE_ID ) + self._client = Redis.from_url('redis://{host}:{port}/{dbid}'.format(host=host, port=port, dbid=dbid)) + self._terminate = threading.Event() + + def terminate(self) -> None: + self._terminate.set() + + def publish(self, topic_name : str, message_content : str) -> None: + self._client.publish(topic_name, message_content) + + def consume(self, topic_names : Set[str], consume_timeout : float) -> Iterator[Tuple[str, str]]: + pubsub = self._client.pubsub(ignore_subscribe_messages=True) + for topic_name in topic_names: pubsub.subscribe(topic_name) + + while not self._terminate.is_set(): + message = pubsub.get_message(ignore_subscribe_messages=True, timeout=consume_timeout) + if message is None: continue + if message['type'] not in {'message', 'pmessage'}: continue + topic = message['channel'].decode('UTF-8') + content = message['data'].decode('UTF-8') + yield Message(topic, content) + + pubsub.unsubscribe() + while pubsub.get_message() is not None: pass + pubsub.close() diff --git a/src/common/message_broker/backend/redis/__init__.py b/src/common/message_broker/backend/redis/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/common/message_broker/tests/__init__.py b/src/common/message_broker/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/common/message_broker/tests/test_unitary.py b/src/common/message_broker/tests/test_unitary.py new file mode 100644 index 0000000000000000000000000000000000000000..7ef27ae68eff96b8b5c3932a453b663820fe8abf --- /dev/null +++ b/src/common/message_broker/tests/test_unitary.py @@ -0,0 +1,144 @@ +import logging, pytest, threading, time +from typing import List, Set +from common.message_broker.Factory import get_messagebroker_backend +from common.message_broker.Message import Message +from common.message_broker.MessageBroker import MessageBroker +from common.message_broker.backend.BackendEnum import BackendEnum +from common.message_broker.backend._Backend import _Backend + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +SCENARIOS = [ + (BackendEnum.INMEMORY, {}), + (BackendEnum.REDIS, { + 'REDIS_SERVICE_HOST': '10.1.7.194', + 'REDIS_SERVICE_PORT': 30283, + 'REDIS_DATABASE_ID': 0, + }), +] + +CONSUME_TIMEOUT = 0.1 # seconds + +TOPIC_DEVICES = 'devices' +TOPIC_LINKS = 'links' +TOPIC_SERVICES = 'services' + +class Consumer(threading.Thread): + def __init__( + self, message_broker : MessageBroker, # pylint: disable=redefined-outer-name + topic_names : Set[str], output_list : List[Message], + consume_timeout=CONSUME_TIMEOUT) -> None: + + super().__init__(daemon=True) + self._message_broker = message_broker + self._topic_names = topic_names + self._output_list = output_list + self._consume_timeout = consume_timeout + + def run(self) -> None: + LOGGER.info('{:s} subscribes to topics {:s}'.format(self.name, str(self._topic_names))) + for message in self._message_broker.consume(self._topic_names, consume_timeout=self._consume_timeout): + LOGGER.info('{:s} receives {:s}'.format(self.name, str(message))) + self._output_list.append(message) + LOGGER.info('{:s} terminates') + +@pytest.fixture(scope='session', ids=[str(scenario[0].value) for scenario in SCENARIOS], params=SCENARIOS) +def message_broker(request): + backend,settings = request.param + LOGGER.info('Running fixture with backend={:s}, settings={:s}...'.format(str(backend), str(settings))) + return MessageBroker(get_messagebroker_backend(backend=backend, **settings)) + +def test_messagebroker_instantiation(): + with pytest.raises(AttributeError) as e: + MessageBroker(None) + str_class_path = '{}.{}'.format(_Backend.__module__, _Backend.__name__) + assert str(e.value) == 'backend must inherit from {}'.format(str_class_path) + + assert MessageBroker(get_messagebroker_backend(BackendEnum.INMEMORY)) is not None + +def test_messagebroker(message_broker : MessageBroker): # pylint: disable=redefined-outer-name + output_list1 : List[Message] = [] + consumer1 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_LINKS}, output_list1) + consumer1.start() + + output_list2 : List[Message] = [] + consumer2 = Consumer(message_broker, {TOPIC_DEVICES, TOPIC_SERVICES}, output_list2) + consumer2.start() + + output_list3 : List[Message] = [] + consumer3 = Consumer(message_broker, {TOPIC_SERVICES}, output_list3) + consumer3.start() + + LOGGER.info('delay') + time.sleep(0.5) + + message = Message(topic=TOPIC_DEVICES, content='new-device-01') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + message = Message(topic=TOPIC_DEVICES, content='new-device-02') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + message = Message(topic=TOPIC_LINKS, content='new-link-01-02') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + LOGGER.info('delay') + time.sleep(0.1) + + message = Message(topic=TOPIC_DEVICES, content='update-device-01') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + message = Message(topic=TOPIC_DEVICES, content='update-device-02') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + message = Message(topic=TOPIC_SERVICES, content='new-service-01-02') + LOGGER.info('publish message={:s}'.format(str(message))) + message_broker.publish(message) + + LOGGER.info('delay') + time.sleep(0.5) + + LOGGER.info('terminate') + message_broker.terminate() + + LOGGER.info('join') + consumer1.join() + consumer2.join() + consumer3.join() + + LOGGER.info('output_list1={:s}'.format(str(output_list1))) + LOGGER.info('output_list2={:s}'.format(str(output_list2))) + LOGGER.info('output_list3={:s}'.format(str(output_list3))) + + assert len(output_list1) == 5 + assert output_list1[0].topic == TOPIC_DEVICES + assert output_list1[0].content == 'new-device-01' + assert output_list1[1].topic == TOPIC_DEVICES + assert output_list1[1].content == 'new-device-02' + assert output_list1[2].topic == TOPIC_LINKS + assert output_list1[2].content == 'new-link-01-02' + assert output_list1[3].topic == TOPIC_DEVICES + assert output_list1[3].content == 'update-device-01' + assert output_list1[4].topic == TOPIC_DEVICES + assert output_list1[4].content == 'update-device-02' + + assert len(output_list2) == 5 + assert output_list2[0].topic == TOPIC_DEVICES + assert output_list2[0].content == 'new-device-01' + assert output_list2[1].topic == TOPIC_DEVICES + assert output_list2[1].content == 'new-device-02' + assert output_list2[2].topic == TOPIC_DEVICES + assert output_list2[2].content == 'update-device-01' + assert output_list2[3].topic == TOPIC_DEVICES + assert output_list2[3].content == 'update-device-02' + assert output_list2[4].topic == TOPIC_SERVICES + assert output_list2[4].content == 'new-service-01-02' + + assert len(output_list3) == 1 + assert output_list3[0].topic == TOPIC_SERVICES + assert output_list3[0].content == 'new-service-01-02' diff --git a/src/common/orm/Factory.py b/src/common/orm/Factory.py index ad208090df9d6dad04e379e0fcc1db337b928307..920d3974a66dbf09f4ccaa69b0f281cc35f9b4a7 100644 --- a/src/common/orm/Factory.py +++ b/src/common/orm/Factory.py @@ -10,8 +10,8 @@ LOGGER = logging.getLogger(__name__) BACKENDS = { BackendEnum.INMEMORY.value: InMemoryBackend, BackendEnum.REDIS.value: RedisBackend, - #BackendEnum.MONGO.value: MongoBackend, - #BackendEnum.RETHINK.value: RethinkBackend, + #BackendEnum.MONGODB.value: MongoDBBackend, + #BackendEnum.RETHINKDB.value: RethinkDBBackend, #BackendEnum.ETCD.value: EtcdBackend, } diff --git a/src/common/orm/backend/BackendEnum.py b/src/common/orm/backend/BackendEnum.py index 6b269419c0812b00ea961304c01bf816500bd015..7699262f468b63044b160b9f0649432668002023 100644 --- a/src/common/orm/backend/BackendEnum.py +++ b/src/common/orm/backend/BackendEnum.py @@ -3,6 +3,6 @@ from enum import Enum class BackendEnum(Enum): INMEMORY = 'inmemory' REDIS = 'redis' - #MONGO = 'mongo' - #RETHINK = 'rethink' + #MONGODB = 'mongodb' + #RETHINKDB = 'rethinkdb' #ETCD = 'etcd' diff --git a/src/common/orm/backend/inmemory/InMemoryBackend.py b/src/common/orm/backend/inmemory/InMemoryBackend.py index ddd25c1720cc2e581e7b205113b14167b03d85aa..aaa9fc8a960e79fbdbcb0df02de60ca2ea35a545 100644 --- a/src/common/orm/backend/inmemory/InMemoryBackend.py +++ b/src/common/orm/backend/inmemory/InMemoryBackend.py @@ -14,7 +14,7 @@ from .Tools import get_dict, get_list, get_or_create_dict, get_or_create_list, g LOGGER = logging.getLogger(__name__) class InMemoryBackend(_Backend): - def __init__(self, **settings): + def __init__(self, **settings): # pylint: disable=super-init-not-called self._lock = threading.Lock() self._keys : Dict[str, Union[Set[str], List[str], Dict[str, str], str]]= {} # key => set/list/dict/string diff --git a/src/common/orm/backend/redis/RedisBackend.py b/src/common/orm/backend/redis/RedisBackend.py index 5dd9a31afef58d1c9f432fff247e29084109dd33..edd73917315322803a102142c69f59dc305955a5 100644 --- a/src/common/orm/backend/redis/RedisBackend.py +++ b/src/common/orm/backend/redis/RedisBackend.py @@ -14,7 +14,7 @@ def get_setting(settings : Dict[str, Any], name : str, default : Any) -> Any: return default if value is None else value class RedisBackend(_Backend): - def __init__(self, **settings) -> None: + def __init__(self, **settings) -> None: # pylint: disable=super-init-not-called host = get_setting(settings, 'REDIS_SERVICE_HOST', DEFAULT_SERVICE_HOST) port = get_setting(settings, 'REDIS_SERVICE_PORT', DEFAULT_SERVICE_PORT) dbid = get_setting(settings, 'REDIS_DATABASE_ID', DEFAULT_DATABASE_ID )