Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Tuple, Type, Union
from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import ConfigActionEnum, ConfigRule
from common.tools.grpc.Tools import grpc_message_to_json_string
#from .EndPointModel import EndPointModel, get_endpoint
from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
LOGGER = logging.getLogger(__name__)
class ORM_ConfigActionEnum(Enum):
UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
SET = ConfigActionEnum.CONFIGACTION_SET
DELETE = ConfigActionEnum.CONFIGACTION_DELETE
grpc_to_enum__config_action = functools.partial(
grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
class ConfigModel(Model): # pylint: disable=abstract-method
def delete(self) -> None:
db_config_rule_pks = self.references(ConfigRuleModel)
for pk,_ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()
super().delete()
def dump(self) -> List[Dict]:
db_config_rule_pks = self.references(ConfigRuleModel)
config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks]
config_rules = sorted(config_rules, key=operator.itemgetter('position'))
return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
class ConfigRuleCustomModel(Model): # pylint: disable=abstract-method
key = StringField(required=True, allow_empty=False)
value = StringField(required=True, allow_empty=False)
def dump(self) -> Dict: # pylint: disable=arguments-differ
return {'custom': {'resource_key': self.key, 'resource_value': self.value}}
class ConfigRuleAclModel(Model): # pylint: disable=abstract-method
# TODO: improve definition of fields in ConfigRuleAclModel
# To simplify, endpoint encoded as JSON-string directly; otherwise causes circular dependencies
#endpoint_fk = ForeignKeyField(EndPointModel)
endpoint_id = StringField(required=True, allow_empty=False)
# To simplify, ACL rule is encoded as a JSON-string directly
acl_data = StringField(required=True, allow_empty=False)
def dump(self) -> Dict: # pylint: disable=arguments-differ
#json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id()
json_endpoint_id = json.loads(self.endpoint_id)
json_acl_rule_set = json.loads(self.acl_data)
return {'acl': {'endpoint_id': json_endpoint_id, 'rule_set': json_acl_rule_set}}
# enum values should match name of field in ConfigRuleModel
class ConfigRuleKindEnum(Enum):
CUSTOM = 'custom'
ACL = 'acl'
Union_SpecificConfigRule = Union[
ConfigRuleCustomModel, ConfigRuleAclModel
]
class ConfigRuleModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
config_fk = ForeignKeyField(ConfigModel)
position = IntegerField(min_value=0, required=True)
action = EnumeratedField(ORM_ConfigActionEnum, required=True)
config_rule_custom_fk = ForeignKeyField(ConfigRuleCustomModel, required=False)
config_rule_acl_fk = ForeignKeyField(ConfigRuleAclModel, required=False)
def delete(self) -> None:
field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
if specific_fk_value is None:
raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
specific_fk_class = getattr(ConfigRuleModel, field_name, None)
foreign_model_class : Model = specific_fk_class.foreign_model
super().delete()
get_object(self.database, foreign_model_class, str(specific_fk_value)).delete()
def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
if specific_fk_value is None:
raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
specific_fk_class = getattr(ConfigRuleModel, field_name, None)
foreign_model_class : Model = specific_fk_class.foreign_model
config_rule : Union_SpecificConfigRule = get_object(self.database, foreign_model_class, str(specific_fk_value))
result = config_rule.dump()
result['action'] = self.action.value
if include_position: result['position'] = self.position
return result
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
Tuple_ConfigRuleSpecs = Tuple[Type, str, Dict, ConfigRuleKindEnum]
def parse_config_rule_custom(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
config_rule_class = ConfigRuleCustomModel
str_config_rule_id = grpc_config_rule.custom.resource_key
config_rule_data = {
'key' : grpc_config_rule.custom.resource_key,
'value': grpc_config_rule.custom.resource_value,
}
return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.CUSTOM
def parse_config_rule_acl(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
config_rule_class = ConfigRuleAclModel
grpc_endpoint_id = grpc_config_rule.acl.endpoint_id
grpc_rule_set = grpc_config_rule.acl.rule_set
device_uuid = grpc_endpoint_id.device_id.device_uuid.uuid
endpoint_uuid = grpc_endpoint_id.endpoint_uuid.uuid
str_endpoint_key = '/'.join([device_uuid, endpoint_uuid])
#str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id)
str_config_rule_id = ':'.join([str_endpoint_key, grpc_rule_set.name])
config_rule_data = {
#'endpoint_fk': db_endpoint,
'endpoint_id': grpc_message_to_json_string(grpc_endpoint_id),
'acl_data': grpc_message_to_json_string(grpc_rule_set),
}
return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.ACL
CONFIGRULE_PARSERS = {
'custom': parse_config_rule_custom,
'acl' : parse_config_rule_acl,
}
Union_ConfigRuleModel = Union[
ConfigRuleCustomModel, ConfigRuleAclModel,
]
database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule, position : int
) -> Tuple[Union_ConfigRuleModel, bool]:
grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
if parser is None:
raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
# create specific ConfigRule
config_rule_class, str_config_rule_id, config_rule_data, config_rule_kind = parser(database, grpc_config_rule)
str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
result : Tuple[Union_ConfigRuleModel, bool] = update_or_create_object(
database, config_rule_class, str_config_rule_key, config_rule_data)
db_specific_config_rule, updated = result
# create generic ConfigRule
config_rule_fk_field_name = 'config_rule_{:s}_fk'.format(config_rule_kind.value)
config_rule_data = {
'config_fk': db_config, 'kind': config_rule_kind, 'position': position,
'action': ORM_ConfigActionEnum.SET,
config_rule_fk_field_name: db_specific_config_rule
}
result : Tuple[ConfigRuleModel, bool] = update_or_create_object(
database, ConfigRuleModel, str_config_rule_key, config_rule_data)
def delete_config_rule(
database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule
grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
if parser is None:
raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
# delete generic config rules; self deletes specific config rule
_, str_config_rule_id, _, config_rule_kind = parser(database, grpc_config_rule)
str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
db_config_rule : Optional[ConfigRuleModel] = get_object(
database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
if db_config_rule is None: return
db_config_rule.delete()
def update_config(
database : Database, db_parent_pk : str, config_name : str, grpc_config_rules
) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
str_config_key = key_to_str([config_name, db_parent_pk], separator=':')
result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
db_config, created = result
db_objects = [(db_config, created)]
for position,grpc_config_rule in enumerate(grpc_config_rules):
action = grpc_to_enum__config_action(grpc_config_rule.action)
if action == ORM_ConfigActionEnum.SET:
result : Tuple[ConfigRuleModel, bool] = set_config_rule(
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
delete_config_rule(database, db_config, grpc_config_rule)
msg = 'Unsupported Action({:s}) for ConfigRule({:s})'
str_action = str(ConfigActionEnum.Name(action))
str_config_rule = grpc_message_to_json_string(grpc_config_rule)
raise AttributeError(msg.format(str_action, str_config_rule))