Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
RedisBackend.py 4.83 KiB
Newer Older
  • Learn to ignore specific revisions
  • import os, uuid
    
    from typing import Any, Dict, List, Optional, Set, Tuple
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from redis.client import Redis
    
    from .._Backend import _Backend
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from ..Tools import key_to_str
    
    from .Mutex import Mutex
    
    DEFAULT_SERVICE_HOST = '127.0.0.1'
    DEFAULT_SERVICE_PORT = 6379
    DEFAULT_DATABASE_ID  = 0
    
    def get_setting(settings : Dict[str, Any], name : str, default : Any) -> Any:
        value = settings.get(name, os.environ.get(name))
        return default if value is None else value
    
    class RedisBackend(_Backend):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, **settings) -> None:
    
            host = get_setting(settings, 'REDIS_SERVICE_HOST', DEFAULT_SERVICE_HOST)
            port = get_setting(settings, 'REDIS_SERVICE_PORT', DEFAULT_SERVICE_PORT)
            dbid = get_setting(settings, 'REDIS_DATABASE_ID',  DEFAULT_DATABASE_ID )
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._client = Redis.from_url('redis://{host}:{port}/{dbid}'.format(host=host, port=port, dbid=dbid))
            self._mutex = Mutex(self._client)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def lock(self, keys : List[List[str]], owner_key : Optional[str] = None) -> Tuple[bool, str]:
            str_keys = {key_to_str(key) for key in keys}
            owner_key = str(uuid.uuid4()) if owner_key is None else owner_key
            return self._mutex.acquire(str_keys, owner_key=owner_key, blocking=True)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def unlock(self, keys : List[List[str]], owner_key : str) -> bool:
            str_keys = {key_to_str(key) for key in keys}
            return self._mutex.release(str_keys, owner_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def keys(self) -> list:
            return [k.decode('UTF-8') for k in self._client.keys()]
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def exists(self, key : List[str]) -> bool:
            str_key = key_to_str(key)
            return self._client.exists(str_key) == 1
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def delete(self, key : List[str]) -> bool:
            str_key = key_to_str(key)
            return self._client.delete(str_key) == 1
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dict_get(self, key : List[str], fields : List[str] = []) -> Dict[str, str]:
            str_key = key_to_str(key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            if len(fields) == 0:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                keys_values = self._client.hgetall(str_key).items()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            else:
                fields = list(fields)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                keys_values = zip(fields, self._client.hmget(str_key, fields))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            attributes = {}
            for key,value in keys_values:
                str_key = key.decode('UTF-8') if isinstance(key, bytes) else key
                attributes[str_key] = value.decode('UTF-8') if isinstance(value, bytes) else value
            return attributes
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dict_update(self, key : List[str], fields : Dict[str, str] = {}) -> None:
            str_key = key_to_str(key)
            if len(fields) > 0:
                self._client.hset(str_key, mapping=fields)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dict_delete(self, key : List[str], fields : List[str] = []) -> None:
            str_key = key_to_str(key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            if len(fields) == 0:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                self._client.delete(str_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            else:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                self._client.hdel(str_key, set(fields))
    
        def list_get_all(self, key : List[str]) -> List[str]:
            str_key = key_to_str(key)
            return list(map(lambda m: m.decode('UTF-8'), self._client.lrange(str_key, 0, -1)))
    
        def list_push_last(self, key : List[str], item : str) -> None:
            str_key = key_to_str(key)
            self._client.rpush(str_key, item)
    
        def list_remove_first_occurrence(self, key : List[str], item: str) -> None:
            str_key = key_to_str(key)
            self._client.lrem(str_key, 1, item)
    
        def set_add(self, key : List[str], item : str) -> None:
            str_key = key_to_str(key)
            self._client.sadd(str_key, item)
    
        def set_has(self, key : List[str], item : str) -> bool:
            str_key = key_to_str(key)
            return self._client.sismember(str_key, item) == 1
    
        def set_get_all(self, key : List[str]) -> Set[str]:
            str_key = key_to_str(key)
            return set(map(lambda m: m.decode('UTF-8'), self._client.smembers(str_key)))
    
        def set_remove(self, key : List[str], item : str) -> None:
            str_key = key_to_str(key)
            self._client.srem(str_key, item)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dump(self) -> List[Tuple[str, str, str]]:
            entries = []
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for str_key in self._client.keys():
                str_key = str_key.decode('UTF-8')
                key_type = self._client.type(str_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if key_type is not None: key_type = key_type.decode('UTF-8')
                key_type = {
                    'hash'  : 'dict',
                    'list'  : 'list',
                    'set'   : 'set',
                    'string': 'str',
                }.get(key_type)
                key_content = {
                    'dict': lambda key: {k.decode('UTF-8'):v.decode('UTF-8') for k,v in self._client.hgetall(key).items()},
                    'list': lambda key: [m.decode('UTF-8') for m in self._client.lrange(key, 0, -1)],
                    'set' : lambda key: {m.decode('UTF-8') for m in self._client.smembers(key)},
                    'str' : lambda key: self._client.get(key).decode('UTF-8'),
                }.get(key_type, lambda key: 'UNSUPPORTED_TYPE')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                entries.append((str_key, key_type, key_content(str_key)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return entries