Factory.py 2.04 KB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, os
from typing import Optional, Union
from .backend._Backend import _Backend
from .backend.BackendEnum import BackendEnum
from .backend.inmemory.InMemoryBackend import InMemoryBackend
from .backend.redis.RedisBackend import RedisBackend
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

BACKENDS = {
    BackendEnum.INMEMORY.value: InMemoryBackend,
    BackendEnum.REDIS.value: RedisBackend,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #BackendEnum.KAFKA.value: KafkaBackend,
    #BackendEnum.RABBITMQ.value: RabbitMQBackend,
    #BackendEnum.ZEROMQ.value: ZeroMQBackend,
}

DEFAULT_MB_BACKEND = BackendEnum.INMEMORY

def get_messagebroker_backend(backend : Optional[Union[str, BackendEnum]] = None, **settings) -> _Backend:
    # return an instance of MessageBroker initialized with selected backend.
    # The backend is selected using following criteria (first that is not None is selected):
    # 1. user selected by parameter (backend=...)
    # 2. environment variable MB_BACKEND
    # 3. default backend: INMEMORY
    if backend is None: backend = os.environ.get('MB_BACKEND', DEFAULT_MB_BACKEND)
    if backend is None: raise Exception('MessageBroker Backend not specified')
    if isinstance(backend, BackendEnum): backend = backend.value
    backend_class = BACKENDS.get(backend)
    if backend_class is None: raise Exception('Unsupported MessageBrokerBackend({:s})'.format(backend))
    LOGGER.info('Selected MessageBroker Backend: {:s}'.format(backend))
    return backend_class(**settings)