# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functools, json, logging, operator from enum import Enum from typing import Dict, List, Optional, Tuple, Type, Union from common.orm.Database import Database from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object from common.orm.backend.Tools import key_to_str from common.orm.fields.EnumeratedField import EnumeratedField from common.orm.fields.ForeignKeyField import ForeignKeyField from common.orm.fields.IntegerField import IntegerField from common.orm.fields.PrimaryKeyField import PrimaryKeyField from common.orm.fields.StringField import StringField from common.orm.model.Model import Model from common.proto.context_pb2 import ConfigActionEnum, ConfigRule from common.tools.grpc.Tools import grpc_message_to_json_string #from .EndPointModel import EndPointModel, get_endpoint from .Tools import fast_hasher, grpc_to_enum, remove_dict_key LOGGER = logging.getLogger(__name__) class ORM_ConfigActionEnum(Enum): UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED SET = ConfigActionEnum.CONFIGACTION_SET DELETE = ConfigActionEnum.CONFIGACTION_DELETE grpc_to_enum__config_action = functools.partial( grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum) class ConfigModel(Model): # pylint: disable=abstract-method pk = PrimaryKeyField() def delete(self) -> None: db_config_rule_pks = self.references(ConfigRuleModel) for pk,_ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() super().delete() def dump(self) -> List[Dict]: db_config_rule_pks = self.references(ConfigRuleModel) config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks] config_rules = sorted(config_rules, key=operator.itemgetter('position')) return [remove_dict_key(config_rule, 'position') for config_rule in config_rules] class ConfigRuleCustomModel(Model): # pylint: disable=abstract-method key = StringField(required=True, allow_empty=False) value = StringField(required=True, allow_empty=False) def dump(self) -> Dict: # pylint: disable=arguments-differ return {'custom': {'resource_key': self.key, 'resource_value': self.value}} class ConfigRuleAclModel(Model): # pylint: disable=abstract-method # TODO: improve definition of fields in ConfigRuleAclModel # To simplify, endpoint encoded as JSON-string directly; otherwise causes circular dependencies #endpoint_fk = ForeignKeyField(EndPointModel) endpoint_id = StringField(required=True, allow_empty=False) # To simplify, ACL rule is encoded as a JSON-string directly acl_data = StringField(required=True, allow_empty=False) def dump(self) -> Dict: # pylint: disable=arguments-differ #json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id() json_endpoint_id = json.loads(self.endpoint_id) json_acl_rule_set = json.loads(self.acl_data) return {'acl': {'endpoint_id': json_endpoint_id, 'rule_set': json_acl_rule_set}} # enum values should match name of field in ConfigRuleModel class ConfigRuleKindEnum(Enum): CUSTOM = 'custom' ACL = 'acl' Union_SpecificConfigRule = Union[ ConfigRuleCustomModel, ConfigRuleAclModel ] class ConfigRuleModel(Model): # pylint: disable=abstract-method pk = PrimaryKeyField() config_fk = ForeignKeyField(ConfigModel) kind = EnumeratedField(ConfigRuleKindEnum) position = IntegerField(min_value=0, required=True) action = EnumeratedField(ORM_ConfigActionEnum, required=True) config_rule_custom_fk = ForeignKeyField(ConfigRuleCustomModel, required=False) config_rule_acl_fk = ForeignKeyField(ConfigRuleAclModel, required=False) def delete(self) -> None: field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value)) specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None) if specific_fk_value is None: raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name)) specific_fk_class = getattr(ConfigRuleModel, field_name, None) foreign_model_class : Model = specific_fk_class.foreign_model super().delete() get_object(self.database, foreign_model_class, str(specific_fk_value)).delete() def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value)) specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None) if specific_fk_value is None: raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name)) specific_fk_class = getattr(ConfigRuleModel, field_name, None) foreign_model_class : Model = specific_fk_class.foreign_model config_rule : Union_SpecificConfigRule = get_object(self.database, foreign_model_class, str(specific_fk_value)) result = config_rule.dump() result['action'] = self.action.value if include_position: result['position'] = self.position return result Tuple_ConfigRuleSpecs = Tuple[Type, str, Dict, ConfigRuleKindEnum] def parse_config_rule_custom(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs: config_rule_class = ConfigRuleCustomModel str_config_rule_id = grpc_config_rule.custom.resource_key config_rule_data = { 'key' : grpc_config_rule.custom.resource_key, 'value': grpc_config_rule.custom.resource_value, } return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.CUSTOM def parse_config_rule_acl(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs: config_rule_class = ConfigRuleAclModel grpc_endpoint_id = grpc_config_rule.acl.endpoint_id grpc_rule_set = grpc_config_rule.acl.rule_set device_uuid = grpc_endpoint_id.device_id.device_uuid.uuid endpoint_uuid = grpc_endpoint_id.endpoint_uuid.uuid str_endpoint_key = '/'.join([device_uuid, endpoint_uuid]) #str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id) str_config_rule_id = ':'.join([str_endpoint_key, grpc_rule_set.name]) config_rule_data = { #'endpoint_fk': db_endpoint, 'endpoint_id': grpc_message_to_json_string(grpc_endpoint_id), 'acl_data': grpc_message_to_json_string(grpc_rule_set), } return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.ACL CONFIGRULE_PARSERS = { 'custom': parse_config_rule_custom, 'acl' : parse_config_rule_acl, } Union_ConfigRuleModel = Union[ ConfigRuleCustomModel, ConfigRuleAclModel, ] def set_config_rule( database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule, position : int ) -> Tuple[Union_ConfigRuleModel, bool]: grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule')) parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind) if parser is None: raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format( grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule))) # create specific ConfigRule config_rule_class, str_config_rule_id, config_rule_data, config_rule_kind = parser(database, grpc_config_rule) str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id])) str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':') result : Tuple[Union_ConfigRuleModel, bool] = update_or_create_object( database, config_rule_class, str_config_rule_key, config_rule_data) db_specific_config_rule, updated = result # create generic ConfigRule config_rule_fk_field_name = 'config_rule_{:s}_fk'.format(config_rule_kind.value) config_rule_data = { 'config_fk': db_config, 'kind': config_rule_kind, 'position': position, 'action': ORM_ConfigActionEnum.SET, config_rule_fk_field_name: db_specific_config_rule } result : Tuple[ConfigRuleModel, bool] = update_or_create_object( database, ConfigRuleModel, str_config_rule_key, config_rule_data) db_config_rule, updated = result return db_config_rule, updated def delete_config_rule( database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule ) -> None: grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule')) parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind) if parser is None: raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format( grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule))) # delete generic config rules; self deletes specific config rule _, str_config_rule_id, _, config_rule_kind = parser(database, grpc_config_rule) str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id])) str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':') db_config_rule : Optional[ConfigRuleModel] = get_object( database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) if db_config_rule is None: return db_config_rule.delete() def update_config( database : Database, db_parent_pk : str, config_name : str, grpc_config_rules ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: str_config_key = key_to_str([config_name, db_parent_pk], separator=':') result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key) db_config, created = result db_objects = [(db_config, created)] for position,grpc_config_rule in enumerate(grpc_config_rules): action = grpc_to_enum__config_action(grpc_config_rule.action) if action == ORM_ConfigActionEnum.SET: result : Tuple[ConfigRuleModel, bool] = set_config_rule( database, db_config, grpc_config_rule, position) db_config_rule, updated = result db_objects.append((db_config_rule, updated)) elif action == ORM_ConfigActionEnum.DELETE: delete_config_rule(database, db_config, grpc_config_rule) else: msg = 'Unsupported Action({:s}) for ConfigRule({:s})' str_action = str(ConfigActionEnum.Name(action)) str_config_rule = grpc_message_to_json_string(grpc_config_rule) raise AttributeError(msg.format(str_action, str_config_rule)) return db_objects