Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
ConfigModel.py 12.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import enum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import functools, logging, operator
    
    from typing import Dict, List, Optional, Tuple, Union
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.backend.Tools import key_to_str
    
    from common.proto.context_pb2 import ConfigActionEnum
    from common.tools.grpc.Tools import grpc_message_to_json_string
    
    from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String
    from sqlalchemy.dialects.postgresql import UUID, ARRAY
    from context.service.database.Base import Base
    from sqlalchemy.orm import relationship
    from context.service.Database import Database
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import functools, json, logging, operator
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from enum import Enum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from typing import Dict, List, Optional, Tuple, Type, Union
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.Database import Database
    
    from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.backend.Tools import key_to_str
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.fields.EnumeratedField import EnumeratedField
    from common.orm.fields.ForeignKeyField import ForeignKeyField
    from common.orm.fields.IntegerField import IntegerField
    from common.orm.fields.PrimaryKeyField import PrimaryKeyField
    from common.orm.fields.StringField import StringField
    from common.orm.model.Model import Model
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.context_pb2 import ConfigActionEnum, ConfigRule
    
    from common.tools.grpc.Tools import grpc_message_to_json_string
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    #from .EndPointModel import EndPointModel, get_endpoint
    
    from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    LOGGER = logging.getLogger(__name__)
    
    
    class ORM_ConfigActionEnum(enum.Enum):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
        SET       = ConfigActionEnum.CONFIGACTION_SET
        DELETE    = ConfigActionEnum.CONFIGACTION_DELETE
    
    grpc_to_enum__config_action = functools.partial(
        grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
    
    
    class ConfigModel(Base): # pylint: disable=abstract-method
        __tablename__ = 'Config'
        config_uuid = Column(UUID(as_uuid=False), primary_key=True)
    
        # Relationships
    
    Carlos Manso's avatar
    Carlos Manso committed
        config_rule = relationship("ConfigRuleModel",  cascade="all,delete", back_populates="config", lazy='joined')
    
    Carlos Manso's avatar
    Carlos Manso committed
        def dump(self) -> List[Dict]:
    
            config_rules = []
            for a in self.config_rule:
                asdf = a.dump()
                config_rules.append(asdf)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
    
    
        @staticmethod
        def main_pk_name():
            return 'config_uuid'
    
    class ConfigRuleModel(Base): # pylint: disable=abstract-method
        __tablename__ = 'ConfigRule'
        config_rule_uuid = Column(UUID(as_uuid=False), primary_key=True)
    
    Carlos Manso's avatar
    Carlos Manso committed
        config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid", ondelete='CASCADE'), primary_key=True)
    
    
        action = Column(Enum(ORM_ConfigActionEnum, create_constraint=True, native_enum=True), nullable=False)
        position = Column(INTEGER, nullable=False)
        key = Column(String, nullable=False)
        value = Column(String, nullable=False)
    
        __table_args__ = (
            CheckConstraint(position >= 0, name='check_position_value'),
            {}
        )
    
        # Relationships
    
    Carlos Manso's avatar
    Carlos Manso committed
        config = relationship("ConfigModel", passive_deletes=True, back_populates="config_rule")
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class ConfigRuleCustomModel(Model): # pylint: disable=abstract-method
        key = StringField(required=True, allow_empty=False)
        value = StringField(required=True, allow_empty=False)
    
        def dump(self) -> Dict: # pylint: disable=arguments-differ
            return {'custom': {'resource_key': self.key, 'resource_value': self.value}}
    
    class ConfigRuleAclModel(Model): # pylint: disable=abstract-method
        # TODO: improve definition of fields in ConfigRuleAclModel
        # To simplify, endpoint encoded as JSON-string directly; otherwise causes circular dependencies
        #endpoint_fk = ForeignKeyField(EndPointModel)
        endpoint_id = StringField(required=True, allow_empty=False)
        # To simplify, ACL rule is encoded as a JSON-string directly
        acl_data = StringField(required=True, allow_empty=False)
    
        def dump(self) -> Dict: # pylint: disable=arguments-differ
            #json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id()
            json_endpoint_id = json.loads(self.endpoint_id)
            json_acl_rule_set = json.loads(self.acl_data)
            return {'acl': {'endpoint_id': json_endpoint_id, 'rule_set': json_acl_rule_set}}
    
    # enum values should match name of field in ConfigRuleModel
    class ConfigRuleKindEnum(Enum):
        CUSTOM = 'custom'
        ACL    = 'acl'
    
    Union_SpecificConfigRule = Union[
        ConfigRuleCustomModel, ConfigRuleAclModel
    ]
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class ConfigRuleModel(Model): # pylint: disable=abstract-method
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        pk = PrimaryKeyField()
        config_fk = ForeignKeyField(ConfigModel)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        kind = EnumeratedField(ConfigRuleKindEnum)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        position = IntegerField(min_value=0, required=True)
        action = EnumeratedField(ORM_ConfigActionEnum, required=True)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        config_rule_custom_fk = ForeignKeyField(ConfigRuleCustomModel, required=False)
        config_rule_acl_fk    = ForeignKeyField(ConfigRuleAclModel, required=False)
    
        def delete(self) -> None:
            field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
            specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
            if specific_fk_value is None:
                raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
            specific_fk_class = getattr(ConfigRuleModel, field_name, None)
            foreign_model_class : Model = specific_fk_class.foreign_model
            super().delete()
            get_object(self.database, foreign_model_class, str(specific_fk_value)).delete()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
            specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
            if specific_fk_value is None:
                raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
            specific_fk_class = getattr(ConfigRuleModel, field_name, None)
            foreign_model_class : Model = specific_fk_class.foreign_model
            config_rule : Union_SpecificConfigRule = get_object(self.database, foreign_model_class, str(specific_fk_value))
            result = config_rule.dump()
            result['action'] = self.action.value
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            if include_position: result['position'] = self.position
            return result
    
        @staticmethod
        def main_pk_name():
            return 'config_rule_uuid'
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    def set_config_rule(
    
        database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str,
    ): # -> Tuple[ConfigRuleModel, bool]:
    
        str_rule_key_hash = fast_hasher(resource_key)
    
        str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
    
        data = {'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key,
                'value': resource_value}
        to_add = ConfigRuleModel(**data)
    
        result = database.create_or_update(to_add)
        return result
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    Tuple_ConfigRuleSpecs = Tuple[Type, str, Dict, ConfigRuleKindEnum]
    
    def parse_config_rule_custom(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
        config_rule_class = ConfigRuleCustomModel
        str_config_rule_id = grpc_config_rule.custom.resource_key
        config_rule_data = {
            'key'  : grpc_config_rule.custom.resource_key,
            'value': grpc_config_rule.custom.resource_value,
        }
        return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.CUSTOM
    
    def parse_config_rule_acl(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
        config_rule_class = ConfigRuleAclModel
        grpc_endpoint_id = grpc_config_rule.acl.endpoint_id
        grpc_rule_set = grpc_config_rule.acl.rule_set
        device_uuid = grpc_endpoint_id.device_id.device_uuid.uuid
        endpoint_uuid = grpc_endpoint_id.endpoint_uuid.uuid
        str_endpoint_key = '/'.join([device_uuid, endpoint_uuid])
        #str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id)
        str_config_rule_id = ':'.join([str_endpoint_key, grpc_rule_set.name])
        config_rule_data = {
            #'endpoint_fk': db_endpoint,
            'endpoint_id': grpc_message_to_json_string(grpc_endpoint_id),
            'acl_data': grpc_message_to_json_string(grpc_rule_set),
        }
        return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.ACL
    
    CONFIGRULE_PARSERS = {
        'custom': parse_config_rule_custom,
        'acl'   : parse_config_rule_acl,
    }
    
    Union_ConfigRuleModel = Union[
        ConfigRuleCustomModel, ConfigRuleAclModel,
    ]
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    def set_config_rule(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule, position : int
    ) -> Tuple[Union_ConfigRuleModel, bool]:
        grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
        parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
        if parser is None:
            raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
                grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
    
        # create specific ConfigRule
        config_rule_class, str_config_rule_id, config_rule_data, config_rule_kind = parser(database, grpc_config_rule)
        str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
        str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
        result : Tuple[Union_ConfigRuleModel, bool] = update_or_create_object(
            database, config_rule_class, str_config_rule_key, config_rule_data)
        db_specific_config_rule, updated = result
    
        # create generic ConfigRule
        config_rule_fk_field_name = 'config_rule_{:s}_fk'.format(config_rule_kind.value)
        config_rule_data = {
            'config_fk': db_config, 'kind': config_rule_kind, 'position': position,
            'action': ORM_ConfigActionEnum.SET,
            config_rule_fk_field_name: db_specific_config_rule
        }
        result : Tuple[ConfigRuleModel, bool] = update_or_create_object(
            database, ConfigRuleModel, str_config_rule_key, config_rule_data)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        db_config_rule, updated = result
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        return db_config_rule, updated
    
    
    def delete_config_rule(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
        parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
        if parser is None:
            raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
                grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # delete generic config rules; self deletes specific config rule
        _, str_config_rule_id, _, config_rule_kind = parser(database, grpc_config_rule)
        str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
        str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
    
        db_config_rule : Optional[ConfigRuleModel] = get_object(
            database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
        if db_config_rule is None: return
        db_config_rule.delete()
    
    def update_config(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        database : Database, db_parent_pk : str, config_name : str, grpc_config_rules
    
    ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
    
        str_config_key = key_to_str([config_name, db_parent_pk], separator=':')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
        db_config, created = result
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        db_objects = [(db_config, created)]
    
        for position,grpc_config_rule in enumerate(grpc_config_rules):
            action = grpc_to_enum__config_action(grpc_config_rule.action)
    
            if action == ORM_ConfigActionEnum.SET:
                result : Tuple[ConfigRuleModel, bool] = set_config_rule(
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    database, db_config, grpc_config_rule, position)
    
                db_config_rule, updated = result
                db_objects.append((db_config_rule, updated))
            elif action == ORM_ConfigActionEnum.DELETE:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                delete_config_rule(database, db_config, grpc_config_rule)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                msg = 'Unsupported Action({:s}) for ConfigRule({:s})'
                str_action = str(ConfigActionEnum.Name(action))
                str_config_rule = grpc_message_to_json_string(grpc_config_rule)
                raise AttributeError(msg.format(str_action, str_config_rule))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        return db_objects