Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
ConfigModel.py 5.11 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import functools, logging, operator
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from enum import Enum
    
    from typing import Dict, List, Optional, Tuple, Union
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.Database import Database
    
    from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.backend.Tools import key_to_str
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.fields.EnumeratedField import EnumeratedField
    from common.orm.fields.ForeignKeyField import ForeignKeyField
    from common.orm.fields.IntegerField import IntegerField
    from common.orm.fields.PrimaryKeyField import PrimaryKeyField
    from common.orm.fields.StringField import StringField
    from common.orm.model.Model import Model
    from context.proto.context_pb2 import ConfigActionEnum
    
    from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    LOGGER = logging.getLogger(__name__)
    
    class ORM_ConfigActionEnum(Enum):
        UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
        SET       = ConfigActionEnum.CONFIGACTION_SET
        DELETE    = ConfigActionEnum.CONFIGACTION_DELETE
    
    grpc_to_enum__config_action = functools.partial(
        grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class ConfigModel(Model): # pylint: disable=abstract-method
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        pk = PrimaryKeyField()
    
        def dump(self) -> List[Dict]:
            db_config_rule_pks = self.references(ConfigRuleModel)
            config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks]
            config_rules = sorted(config_rules, key=operator.itemgetter('position'))
            return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class ConfigRuleModel(Model): # pylint: disable=abstract-method
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        pk = PrimaryKeyField()
        config_fk = ForeignKeyField(ConfigModel)
        position = IntegerField(min_value=0, required=True)
        action = EnumeratedField(ORM_ConfigActionEnum, required=True)
        key = StringField(required=True, allow_empty=False)
        value = StringField(required=True, allow_empty=False)
    
        def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
            result = {
                'action': self.action.value,
                'resource_key': self.key,
                'resource_value': self.value,
            }
            if include_position: result['position'] = self.position
            return result
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    def set_config_rule(
    
        database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        ) -> Tuple[ConfigRuleModel, bool]:
    
    
        str_rule_key_hash = fast_hasher(resource_key)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
        result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, {
    
            'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET,
            'key': resource_key, 'value': resource_value})
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        db_config_rule, updated = result
        return db_config_rule, updated
    
    
    def delete_config_rule(
        database : Database, db_config : ConfigModel, resource_key : str
        ) -> None:
    
        str_rule_key_hash = fast_hasher(resource_key)
        str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
        db_config_rule : Optional[ConfigRuleModel] = get_object(
            database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
        if db_config_rule is None: return
        db_config_rule.delete()
    
    def delete_all_config_rules(
        database : Database, db_config : ConfigModel
        ) -> None:
    
        db_config_rule_pks = db_config.references(ConfigRuleModel)
        for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete()
    
    def grpc_config_rules_to_raw(grpc_config_rules) -> List[Tuple[ORM_ConfigActionEnum, str, str]]:
        def translate(grpc_config_rule):
            action = grpc_to_enum__config_action(grpc_config_rule.action)
            return action, grpc_config_rule.resource_key, grpc_config_rule.resource_value
        return [translate(grpc_config_rule) for grpc_config_rule in grpc_config_rules]
    
    def update_config(
        database : Database, db_parent_pk : str, config_name : str,
        raw_config_rules : List[Tuple[ORM_ConfigActionEnum, str, str]]
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
    
        str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
        result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
        db_config, created = result
    
    
        db_objects : List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)]
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    
        for position,(action, resource_key, resource_value) in enumerate(raw_config_rules):
            if action == ORM_ConfigActionEnum.SET:
                result : Tuple[ConfigRuleModel, bool] = set_config_rule(
                    database, db_config, position, resource_key, resource_value)
                db_config_rule, updated = result
                db_objects.append((db_config_rule, updated))
            elif action == ORM_ConfigActionEnum.DELETE:
                delete_config_rule(database, db_config, resource_key)
            else:
                msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
                raise AttributeError(msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        return db_objects