Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from common.proto.context_pb2 import ConfigRule
from common.tools.grpc.Tools import grpc_message_to_json_string
from .models.enums.ConfigAction import ORM_ConfigActionEnum, grpc_to_enum__config_action
from .models.ConfigRuleModel import ConfigRuleKindEnum, ConfigRuleModel
from .uuids._Builder import get_uuid_from_string
from .uuids.EndPoint import endpoint_get_uuid
device_uuid : Optional[str] = None, service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> List[Dict]:
dict_config_rules : List[Dict] = list()
for position,config_rule in enumerate(config_rules):
str_kind = config_rule.WhichOneof('config_rule')
kind = ConfigRuleKindEnum._member_map_.get(str_kind.upper()) # pylint: disable=no-member
'position' : position,
'kind' : kind,
'action' : grpc_to_enum__config_action(config_rule.action),
'data' : grpc_message_to_json_string(getattr(config_rule, str_kind, {})),
'created_at': now,
'updated_at': now,
parent_uuid = None
if device_uuid is not None:
dict_config_rule['device_uuid'] = device_uuid
parent_uuid = device_uuid
elif service_uuid is not None:
dict_config_rule['service_uuid'] = service_uuid
parent_uuid = service_uuid
elif slice_uuid is not None:
dict_config_rule['slice_uuid'] = slice_uuid
parent_uuid = slice_uuid
else:
MSG = 'Parent for ConfigRule({:s}) cannot be identified '+\
'(device_uuid={:s}, service_uuid={:s}, slice_uuid={:s})'
str_config_rule = grpc_message_to_json_string(config_rule)
raise Exception(MSG.format(str_config_rule, str(device_uuid), str(service_uuid), str(slice_uuid)))
configrule_name = None
if kind == ConfigRuleKindEnum.CUSTOM:
configrule_name = config_rule.custom.resource_key
elif kind == ConfigRuleKindEnum.ACL:
endpoint_uuid = endpoint_get_uuid(config_rule.acl.endpoint_id, allow_random=False)
rule_set_name = config_rule.acl.rule_set.name
configrule_name = '{:s}/{:s}'.format(endpoint_uuid, rule_set_name)
else:
MSG = 'Name for ConfigRule({:s}) cannot be inferred '+\
'(device_uuid={:s}, service_uuid={:s}, slice_uuid={:s})'
str_config_rule = grpc_message_to_json_string(config_rule)
raise Exception(MSG.format(str_config_rule, str(device_uuid), str(service_uuid), str(slice_uuid)))
configrule_uuid = get_uuid_from_string(configrule_name, prefix_for_name=parent_uuid)
dict_config_rule['configrule_uuid'] = configrule_uuid
dict_config_rules.append(dict_config_rule)
return dict_config_rules
def upsert_config_rules(
session : Session, config_rules : List[Dict],
device_uuid : Optional[str] = None, service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None,
) -> bool:
uuids_to_delete : Set[str] = set()
uuids_to_upsert : Dict[str, int] = dict()
rules_to_upsert : List[Dict] = list()
for config_rule in config_rules:
if config_rule['action'] == ORM_ConfigActionEnum.SET:
configrule_uuid = config_rule['configrule_uuid']
position = uuids_to_upsert.get(configrule_uuid)
if position is None:
# if not added, add it
rules_to_upsert.append(config_rule)
uuids_to_upsert[config_rule['configrule_uuid']] = len(rules_to_upsert) - 1
else:
# if already added, update occurrence
rules_to_upsert[position] = config_rule
elif config_rule['action'] == ORM_ConfigActionEnum.DELETE:
uuids_to_delete.add(config_rule['configrule_uuid'])
else:
MSG = 'Action for ConfigRule({:s}) is not supported '+\
'(device_uuid={:s}, service_uuid={:s}, slice_uuid={:s})'
str_config_rule = json.dumps(config_rule)
raise Exception(MSG.format(str_config_rule, str(device_uuid), str(service_uuid), str(slice_uuid)))
LOGGER.warning('device_uuid={:s}'.format(str(device_uuid)))
LOGGER.warning('service_uuid={:s}'.format(str(service_uuid)))
LOGGER.warning('slice_uuid={:s}'.format(str(slice_uuid)))
LOGGER.warning('uuids_to_delete={:s}'.format(str(uuids_to_delete)))
LOGGER.warning('rules_to_upsert={:s}'.format(str(rules_to_upsert)))
delete_affected = False
upsert_affected = False
if len(uuids_to_delete) > 0:
stmt = delete(ConfigRuleModel)
if device_uuid is not None: stmt = stmt.where(ConfigRuleModel.device_uuid == device_uuid )
if service_uuid is not None: stmt = stmt.where(ConfigRuleModel.service_uuid == service_uuid)
if slice_uuid is not None: stmt = stmt.where(ConfigRuleModel.slice_uuid == slice_uuid )
stmt = stmt.where(ConfigRuleModel.configrule_uuid.in_(uuids_to_delete))
str_stmt = stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
LOGGER.warning('raw delete stmt={:s}'.format(str(str_stmt)))
configrule_deletes = session.execute(stmt)
LOGGER.warning('configrule_deletes.rowcount={:s}'.format(str(configrule_deletes.rowcount)))
delete_affected = int(configrule_deletes.rowcount) > 0
if len(rules_to_upsert) > 0:
stmt = insert(ConfigRuleModel).values(rules_to_upsert)
stmt = stmt.on_conflict_do_update(
index_elements=[ConfigRuleModel.configrule_uuid],
set_=dict(
position = stmt.excluded.position,
action = stmt.excluded.action,
data = stmt.excluded.data,
updated_at = stmt.excluded.updated_at,
)
)
stmt = stmt.returning(ConfigRuleModel.created_at, ConfigRuleModel.updated_at)
upsert_affected = any([(updated_at > created_at) for created_at,updated_at in configrule_updates])