Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from sqlalchemy.orm import relationship
from context.service.Database import Database
from typing import Dict, List, Optional, Tuple, Type, Union
from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import ConfigActionEnum, ConfigRule
from common.tools.grpc.Tools import grpc_message_to_json_string
#from .EndPointModel import EndPointModel, get_endpoint
from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
SET = ConfigActionEnum.CONFIGACTION_SET
DELETE = ConfigActionEnum.CONFIGACTION_DELETE
grpc_to_enum__config_action = functools.partial(
grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
class ConfigModel(Base): # pylint: disable=abstract-method
__tablename__ = 'Config'
config_uuid = Column(UUID(as_uuid=False), primary_key=True)
# Relationships
config_rule = relationship("ConfigRuleModel", cascade="all,delete", back_populates="config", lazy='joined')
config_rules = []
for a in self.config_rule:
asdf = a.dump()
config_rules.append(asdf)
return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
@staticmethod
def main_pk_name():
return 'config_uuid'
class ConfigRuleModel(Base): # pylint: disable=abstract-method
__tablename__ = 'ConfigRule'
config_rule_uuid = Column(UUID(as_uuid=False), primary_key=True)
config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid", ondelete='CASCADE'), primary_key=True)
action = Column(Enum(ORM_ConfigActionEnum, create_constraint=True, native_enum=True), nullable=False)
position = Column(INTEGER, nullable=False)
key = Column(String, nullable=False)
value = Column(String, nullable=False)
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
{}
)
# Relationships
config = relationship("ConfigModel", passive_deletes=True, back_populates="config_rule")
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ConfigRuleCustomModel(Model): # pylint: disable=abstract-method
key = StringField(required=True, allow_empty=False)
value = StringField(required=True, allow_empty=False)
def dump(self) -> Dict: # pylint: disable=arguments-differ
return {'custom': {'resource_key': self.key, 'resource_value': self.value}}
class ConfigRuleAclModel(Model): # pylint: disable=abstract-method
# TODO: improve definition of fields in ConfigRuleAclModel
# To simplify, endpoint encoded as JSON-string directly; otherwise causes circular dependencies
#endpoint_fk = ForeignKeyField(EndPointModel)
endpoint_id = StringField(required=True, allow_empty=False)
# To simplify, ACL rule is encoded as a JSON-string directly
acl_data = StringField(required=True, allow_empty=False)
def dump(self) -> Dict: # pylint: disable=arguments-differ
#json_endpoint_id = EndPointModel(self.database, self.endpoint_fk).dump_id()
json_endpoint_id = json.loads(self.endpoint_id)
json_acl_rule_set = json.loads(self.acl_data)
return {'acl': {'endpoint_id': json_endpoint_id, 'rule_set': json_acl_rule_set}}
# enum values should match name of field in ConfigRuleModel
class ConfigRuleKindEnum(Enum):
CUSTOM = 'custom'
ACL = 'acl'
Union_SpecificConfigRule = Union[
ConfigRuleCustomModel, ConfigRuleAclModel
]
class ConfigRuleModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
config_fk = ForeignKeyField(ConfigModel)
position = IntegerField(min_value=0, required=True)
action = EnumeratedField(ORM_ConfigActionEnum, required=True)
config_rule_custom_fk = ForeignKeyField(ConfigRuleCustomModel, required=False)
config_rule_acl_fk = ForeignKeyField(ConfigRuleAclModel, required=False)
def delete(self) -> None:
field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
if specific_fk_value is None:
raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
specific_fk_class = getattr(ConfigRuleModel, field_name, None)
foreign_model_class : Model = specific_fk_class.foreign_model
super().delete()
get_object(self.database, foreign_model_class, str(specific_fk_value)).delete()
def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
field_name = 'config_rule_{:s}_fk'.format(str(self.kind.value))
specific_fk_value : Optional[ForeignKeyField] = getattr(self, field_name, None)
if specific_fk_value is None:
raise Exception('Unable to find config_rule key for field_name({:s})'.format(field_name))
specific_fk_class = getattr(ConfigRuleModel, field_name, None)
foreign_model_class : Model = specific_fk_class.foreign_model
config_rule : Union_SpecificConfigRule = get_object(self.database, foreign_model_class, str(specific_fk_value))
result = config_rule.dump()
result['action'] = self.action.value
if include_position: result['position'] = self.position
return result
@staticmethod
def main_pk_name():
return 'config_rule_uuid'
database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str,
): # -> Tuple[ConfigRuleModel, bool]:
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
data = {'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key,
'value': resource_value}
to_add = ConfigRuleModel(**data)
result = database.create_or_update(to_add)
return result
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
Tuple_ConfigRuleSpecs = Tuple[Type, str, Dict, ConfigRuleKindEnum]
def parse_config_rule_custom(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
config_rule_class = ConfigRuleCustomModel
str_config_rule_id = grpc_config_rule.custom.resource_key
config_rule_data = {
'key' : grpc_config_rule.custom.resource_key,
'value': grpc_config_rule.custom.resource_value,
}
return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.CUSTOM
def parse_config_rule_acl(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
config_rule_class = ConfigRuleAclModel
grpc_endpoint_id = grpc_config_rule.acl.endpoint_id
grpc_rule_set = grpc_config_rule.acl.rule_set
device_uuid = grpc_endpoint_id.device_id.device_uuid.uuid
endpoint_uuid = grpc_endpoint_id.endpoint_uuid.uuid
str_endpoint_key = '/'.join([device_uuid, endpoint_uuid])
#str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id)
str_config_rule_id = ':'.join([str_endpoint_key, grpc_rule_set.name])
config_rule_data = {
#'endpoint_fk': db_endpoint,
'endpoint_id': grpc_message_to_json_string(grpc_endpoint_id),
'acl_data': grpc_message_to_json_string(grpc_rule_set),
}
return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.ACL
CONFIGRULE_PARSERS = {
'custom': parse_config_rule_custom,
'acl' : parse_config_rule_acl,
}
Union_ConfigRuleModel = Union[
ConfigRuleCustomModel, ConfigRuleAclModel,
]
database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule, position : int
) -> Tuple[Union_ConfigRuleModel, bool]:
grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
if parser is None:
raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
# create specific ConfigRule
config_rule_class, str_config_rule_id, config_rule_data, config_rule_kind = parser(database, grpc_config_rule)
str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
result : Tuple[Union_ConfigRuleModel, bool] = update_or_create_object(
database, config_rule_class, str_config_rule_key, config_rule_data)
db_specific_config_rule, updated = result
# create generic ConfigRule
config_rule_fk_field_name = 'config_rule_{:s}_fk'.format(config_rule_kind.value)
config_rule_data = {
'config_fk': db_config, 'kind': config_rule_kind, 'position': position,
'action': ORM_ConfigActionEnum.SET,
config_rule_fk_field_name: db_specific_config_rule
}
result : Tuple[ConfigRuleModel, bool] = update_or_create_object(
database, ConfigRuleModel, str_config_rule_key, config_rule_data)
def delete_config_rule(
database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule
grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
if parser is None:
raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
# delete generic config rules; self deletes specific config rule
_, str_config_rule_id, _, config_rule_kind = parser(database, grpc_config_rule)
str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
db_config_rule : Optional[ConfigRuleModel] = get_object(
database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
if db_config_rule is None: return
db_config_rule.delete()
def update_config(
database : Database, db_parent_pk : str, config_name : str, grpc_config_rules
) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
str_config_key = key_to_str([config_name, db_parent_pk], separator=':')
result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
db_config, created = result
db_objects = [(db_config, created)]
for position,grpc_config_rule in enumerate(grpc_config_rules):
action = grpc_to_enum__config_action(grpc_config_rule.action)
if action == ORM_ConfigActionEnum.SET:
result : Tuple[ConfigRuleModel, bool] = set_config_rule(
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
delete_config_rule(database, db_config, grpc_config_rule)
msg = 'Unsupported Action({:s}) for ConfigRule({:s})'
str_action = str(ConfigActionEnum.Name(action))
str_config_rule = grpc_message_to_json_string(grpc_config_rule)
raise AttributeError(msg.format(str_action, str_config_rule))