Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import delete
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from typing import Dict, List, Optional
from common.proto.context_pb2 import Constraint
from common.tools.grpc.Tools import grpc_message_to_json_string
from .models.ConstraintModel import ConstraintKindEnum, ConstraintModel
from .uuids._Builder import get_uuid_random
service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> List[Dict]:
dict_constraints : List[Dict] = list()
for position,constraint in enumerate(constraints):
str_kind = constraint.WhichOneof('constraint')
dict_constraint = {
'constraint_uuid': get_uuid_random(),
'position' : position,
'kind' : ConstraintKindEnum._member_map_.get(str_kind.upper()), # pylint: disable=no-member
'data' : grpc_message_to_json_string(getattr(constraint, str_kind, {})),
}
if service_uuid is not None: dict_constraint['service_uuid'] = service_uuid
if slice_uuid is not None: dict_constraint['slice_uuid' ] = slice_uuid
dict_constraints.append(dict_constraint)
return dict_constraints
def upsert_constraints(
session : Session, constraints : List[Dict],
service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> List[bool]:
# TODO: do not delete all constraints; just add-remove as needed
stmt = delete(ConstraintModel)
if service_uuid is not None: stmt = stmt.where(ConstraintModel.service_uuid == service_uuid)
if slice_uuid is not None: stmt = stmt.where(ConstraintModel.slice_uuid == slice_uuid )
session.execute(stmt)
stmt = insert(ConstraintModel).values(constraints)
#stmt = stmt.on_conflict_do_update(
# index_elements=[ConstraintModel.configrule_uuid],
# set_=dict(
# updated_at = stmt.excluded.updated_at,
# )
#)
stmt = stmt.returning(ConstraintModel.created_at, ConstraintModel.updated_at)
constraint_updates = session.execute(stmt).fetchall()
return constraint_updates
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int
# ) -> Tuple[Union_ConstraintModel, bool]:
# with self.session() as session:
#
# grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint'))
#
# parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind)
# if parser is None:
# raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format(
# grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint)))
#
# # create specific constraint
# constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint)
# str_constraint_id = str(uuid.uuid4())
# LOGGER.info('str_constraint_id: {}'.format(str_constraint_id))
# # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id]))
# # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':')
#
# # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object(
# # database, constraint_class, str_constraint_key, constraint_data)
# constraint_data[constraint_class.main_pk_name()] = str_constraint_id
# db_new_constraint = constraint_class(**constraint_data)
# result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
# db_specific_constraint, updated = result
#
# # create generic constraint
# # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value)
# constraint_data = {
# 'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind
# }
#
# db_new_constraint = ConstraintModel(**constraint_data)
# result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
# db_constraint, updated = result
#
# return db_constraint, updated
#
# def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints
# ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]:
# with self.session() as session:
# # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':')
# # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key)
# result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
# created = None
# if result:
# created = True
# session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
# db_constraints = ConstraintsModel(constraints_uuid=service_uuid)
# session.add(db_constraints)
#
# db_objects = [(db_constraints, created)]
#
# for position,grpc_constraint in enumerate(grpc_constraints):
# result : Tuple[ConstraintModel, bool] = self.set_constraint(
# db_constraints, grpc_constraint, position)
# db_constraint, updated = result
# db_objects.append((db_constraint, updated))
#
# return db_objects