Skip to content
Snippets Groups Projects
Model.py 13.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from __future__ import annotations
    
    import logging, re
    
    from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union
    
    from common.orm.Database import Database
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.backend.Tools import key_to_str
    from common.orm.fields.ForeignKeyField import ForeignKeyField
    
    from ..Exceptions import ConstraintException, MutexException
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from ..fields.Field import Field
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from ..fields.PrimaryKeyField import PrimaryKeyField
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .Tools import NoDupOrderedDict
    
    
    LOGGER = logging.getLogger(__name__)
    DEFAULT_PRIMARY_KEY_NAME = 'pk_auto'
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class MetaModel(type):
        @classmethod
    
        def __prepare__(cls, name : str, bases : Tuple[type, ...], **attrs : Any) -> Mapping[str, Any]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return NoDupOrderedDict()
    
    
        def __new__(cls, name : str, bases : Tuple[type, ...], attrs : NoDupOrderedDict[str, Any]):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            field_names = list()
    
            pk_field_name = None
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for key, value in attrs.items():
                if not isinstance(value, Field): continue
    
                value.name = key
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                field_names.append(key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if not isinstance(value, PrimaryKeyField): continue
    
                if pk_field_name is None:
                    pk_field_name = key
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    continue
    
                raise AttributeError('PrimaryKeyField for Model({:s}) already set to attribute({:s})'.format(
                    str(name), str(pk_field_name)))
            if pk_field_name is None:
                if DEFAULT_PRIMARY_KEY_NAME in attrs.keys():
                    msg = 'PrimaryKeyField for Model({:s}) not defined and attribute "{:s}" already used. '\
                          'Leave attribute name "{:s}" for automatic PrimaryKeyField, or set a PrimaryKeyField.'
                    raise AttributeError(msg.format(str(name), DEFAULT_PRIMARY_KEY_NAME, DEFAULT_PRIMARY_KEY_NAME))
                pk_field_name = DEFAULT_PRIMARY_KEY_NAME
                attrs[pk_field_name] = PrimaryKeyField(name=pk_field_name)
                field_names.append(pk_field_name)
            cls_obj = super().__new__(cls, name, bases, dict(attrs))
            setattr(cls_obj, '_pk_field_name', pk_field_name)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            setattr(cls_obj, '_field_names_list', field_names)
            setattr(cls_obj, '_field_names_set', set(field_names))
            return cls_obj
    
    
    KEYWORD_INSTANCES  = 'instances'
    KEYWORD_LOCK       = 'lock'
    KEYWORD_REFERENCES = 'references'
    KEYWORD_STORED     = '_stored'
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class Model(metaclass=MetaModel):
    
        @classmethod
        def get_backend_key_instances(cls) -> str:
            return key_to_str(['{:s}'.format(cls.__name__), KEYWORD_INSTANCES])
    
        @classmethod
        def get_backend_key_instance(cls, primary_key : str) -> str:
            return '{:s}[{:s}]'.format(cls.__name__, primary_key)
    
        @classmethod
        def get_backend_key_references(cls, primary_key : str) -> str:
            match = re.match(r'^[a-zA-Z0-9\_]+\[([^\]]*)\]', primary_key)
            if not match: primary_key = cls.get_backend_key_instance(primary_key)
            return key_to_str([primary_key, KEYWORD_REFERENCES])
    
        @staticmethod
        def get_backend_key_lock(backend_key : str) -> str:
            if backend_key.endswith(KEYWORD_LOCK): return backend_key
            return key_to_str([backend_key, KEYWORD_LOCK])
    
        @staticmethod
        def get_backend_key_locks(backend_keys : List[str]) -> List[str]:
            return [Model.get_backend_key_lock(backend_key) for backend_key in backend_keys]
    
        @classmethod
        def backend_key__to__instance_key(cls, backend_key : str) -> str:
            class_name = cls.__name__
            if backend_key.startswith(class_name):
                match = re.match(r'^{:s}\[([^\]]*)\]'.format(class_name), backend_key)
                if match: return match.group(1)
            return backend_key
    
    
        def __init__(self, database : Database, primary_key : str, auto_load : bool = True) -> None:
            if not isinstance(database, Database):
                str_class_path = '{}.{}'.format(Database.__module__, Database.__name__)
                raise AttributeError('database must inherit from {}'.format(str_class_path))
            self._model_class = type(self)
            self._class_name = self._model_class.__name__
            pk_field_name = self._pk_field_name # pylint: disable=no-member
            pk_field_instance : 'PrimaryKeyField' = getattr(self._model_class, pk_field_name)
            primary_key = pk_field_instance.validate(primary_key)
    
            primary_key = self.backend_key__to__instance_key(primary_key)
    
            setattr(self, pk_field_name, primary_key)
            self._database = database
            self._backend = database.backend
    
            self._instance_key : str = self.get_backend_key_instance(primary_key)
            self._instances_key : str = self.get_backend_key_instances()
            self._references_key : str = self.get_backend_key_references(primary_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._owner_key : Optional[str] = None
    
            if auto_load: self.load()
    
        @property
        def database(self) -> Database: return self._database
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @property
    
        def instance_key(self) -> str: return self._instance_key
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def lock(self, extra_keys : List[List[str]] = []):
    
            lock_keys = Model.get_backend_key_locks(
                [self._instance_key, self._instances_key, self._references_key] + extra_keys)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            acquired,self._owner_key = self._backend.lock(lock_keys, owner_key=self._owner_key)
            if acquired: return
            raise MutexException('Unable to lock keys {:s} using owner_key {:s}'.format(
                str(lock_keys), str(self._owner_key)))
    
        def unlock(self, extra_keys : List[List[str]] = []):
    
            lock_keys = Model.get_backend_key_locks(
                [self._instance_key, self._instances_key, self._references_key] + extra_keys)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            released = self._backend.unlock(lock_keys, self._owner_key)
            if released: return
    
            raise MutexException('Unable to unlock keys {:s} using owner_key {:s}'.format(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                str(lock_keys), str(self._owner_key)))
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def load(self) -> bool:
    
            pk_field_name = self._pk_field_name # pylint: disable=no-member
    
            try:
                self.lock()
    
                attributes = self._backend.dict_get(self._instance_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if attributes is None or len(attributes) == 0: return False
    
                for field_name in self._field_names_list: # pylint: disable=no-member
                    if field_name == pk_field_name: continue
                    if field_name not in attributes: continue
                    raw_field_value = attributes[field_name]
                    field_instance : 'Field' = getattr(self._model_class, field_name)
                    field_value = field_instance.deserialize(raw_field_value)
                    if isinstance(field_instance, ForeignKeyField):
    
                        setattr(self, field_name + KEYWORD_STORED, field_value)
    
                        field_value = field_instance.foreign_model(self._database, field_value, auto_load=True)
                    setattr(self, field_name, field_value)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                return True
    
            finally:
                self.unlock()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def save(self) -> None:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            attributes : Dict[str, Any] = dict()
            required_keys : Set[str] = set()
            foreign_additions : Dict[str, str] = dict()
            foreign_removals : Dict[str, str] = dict()
            for field_name in self._field_names_list: # pylint: disable=no-member
    
                field_value = getattr(self, field_name)
                field_instance : 'Field' = getattr(self._model_class, field_name)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                serialized_field_value = field_instance.serialize(field_value)
    
                if (serialized_field_value is None) and (not field_instance.required): continue
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if isinstance(field_instance, ForeignKeyField):
    
                    foreign_reference = '{:s}:{:s}'.format(self._instance_key, field_name)
    
                    field_value_stored = getattr(self, field_name + KEYWORD_STORED, None)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    if field_value_stored is not None:
    
                        foreign_removals[self.get_backend_key_references(field_value_stored)] = foreign_reference
                    foreign_additions[self.get_backend_key_references(serialized_field_value)] = foreign_reference
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    required_keys.add(serialized_field_value)
                attributes[field_name] = serialized_field_value
    
            extra_keys = []
            extra_keys.extend(list(foreign_removals.keys()))
            extra_keys.extend(list(foreign_additions.keys()))
    
            try:
                self.lock(extra_keys=extra_keys)
    
    
                not_exists = [
                    str(required_key)
                    for required_key in required_keys
                    if not self._backend.exists(required_key)]
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                if len(not_exists) > 0:
    
                    raise ConstraintException('Required Keys ({:s}) does not exist'.format(', '.join(sorted(not_exists))))
    
                self._backend.dict_update(self._instance_key, attributes)
    
                self._backend.set_add(self._instances_key, self._instance_key)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                for serialized_field_value,foreign_reference in foreign_removals.items():
                    self._backend.set_remove(serialized_field_value, foreign_reference)
    
                for serialized_field_value,foreign_reference in foreign_additions.items():
                    self._backend.set_add(serialized_field_value, foreign_reference)
            finally:
                self.unlock(extra_keys=extra_keys)
    
            for serialized_field_value,foreign_reference in foreign_additions.items():
    
                setattr(self, (foreign_reference.rsplit(':', 1)[-1]) + KEYWORD_STORED, field_value_stored)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def delete(self) -> None:
    
            foreign_removals : Dict[str, str] = {}
            for field_name in self._field_names_list: # pylint: disable=no-member
                field_instance : 'Field' = getattr(self._model_class, field_name)
                if not isinstance(field_instance, ForeignKeyField): continue
                foreign_reference = '{:s}:{:s}'.format(self._instance_key, field_name)
    
                field_value_stored = getattr(self, field_name + KEYWORD_STORED, None)
    
                if field_value_stored is None: continue
    
                foreign_removals[self.get_backend_key_references(field_value_stored)] = foreign_reference
    
    
            extra_keys = []
            extra_keys.extend(list(foreign_removals.keys()))
    
            try:
                self.lock(extra_keys=extra_keys)
    
                if self._backend.exists(self._references_key):
                    references = self._backend.set_get_all(self._references_key)
                    raise ConstraintException('Instance is used by Keys ({:s})'.format(', '.join(sorted(references))))
    
                self._backend.delete(self._instance_key)
    
                self._backend.set_remove(self._instances_key, self._instance_key)
    
    
                for serialized_field_value,foreign_reference in foreign_removals.items():
                    self._backend.set_remove(serialized_field_value, foreign_reference)
            finally:
                self.unlock(extra_keys=extra_keys)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @staticmethod
        def get_model_name(model_or_str) -> str:
            if isinstance(model_or_str, str):
                return model_or_str
            if (type(model_or_str).__name__ == 'MetaModel') and issubclass(model_or_str, Model):
                return model_or_str.__name__
            raise Exception()
    
    
        def references(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self, filter_by_models : Optional[Union[type, List[type], Set[type], Tuple[type]]] = None
            ) -> Set[Tuple[str, str]]:
    
    
                if not self._backend.exists(self._references_key): return {}
                references = self._backend.set_get_all(self._references_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                try:
                    if filter_by_models is None:
                        pass
                    elif isinstance(filter_by_models, str):
                        filter_by_models = {filter_by_models}
                    elif isinstance(filter_by_models, (list, set, tuple)):
                        filter_by_models = {Model.get_model_name(model_or_str) for model_or_str in filter_by_models}
                    elif (type(filter_by_models).__name__ == 'MetaModel') and issubclass(filter_by_models, Model):
                        filter_by_models = {Model.get_model_name(filter_by_models)}
                    else:
                        raise Exception()
                except Exception as e:
    
                    msg = 'filter_by_models({:s}) unsupported. Expected a type or a list/set of types. Optionally, keep '\
                          'it as None to retrieve all the references pointing to this instance.'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    raise AttributeError(msg.format(str(filter_by_models))) from e
    
                if filter_by_models:
                    references = filter(lambda instance_key: instance_key.split('[', 1)[0] in filter_by_models, references)
                return {tuple(reference.rsplit(':', 1)) for reference in references}
    
        @classmethod
        def get_primary_keys(cls, database : Database):
            backend = database.backend
            key_model_instances = cls.get_backend_key_instances()
            key_model_instances_lock = cls.get_backend_key_lock(key_model_instances)
    
            acquired,owner_key = backend.lock(key_model_instances_lock)
            if not acquired:
                raise MutexException('Unable to lock keys {:s}'.format(
                    str(key_model_instances_lock)))
    
            instance_keys = backend.set_get_all(key_model_instances)
    
            released = backend.unlock(key_model_instances_lock, owner_key)
            if not released:
                raise MutexException('Unable to unlock keys {:s} using owner_key {:s}'.format(
                    str(key_model_instances_lock), str(owner_key)))
    
            return instance_keys
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dump_id(self) -> Dict:
            raise NotImplementedError()
    
        def dump(self) -> Dict:
            raise NotImplementedError()
    
        def __repr__(self) -> str:
    
            pk_field_name = self._pk_field_name # pylint: disable=no-member
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            arguments = ', '.join(
                '{:s}={:s}{:s}'.format(
                    name, repr(getattr(self, name)), '(PK)' if name == pk_field_name else '')
                for name in self._field_names_list # pylint: disable=no-member
            )
            return '{:s}({:s})'.format(self._class_name, arguments)