Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import delete
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from typing import Dict, List, Optional
from common.proto.context_pb2 import Constraint
from common.tools.grpc.Tools import grpc_message_to_json_string
from .models.ConstraintModel import ConstraintKindEnum, ConstraintModel
from .uuids._Builder import get_uuid_random
def compose_constraints_data(
constraints : List[Constraint],
service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> List[Dict]:
dict_constraints : List[Dict] = list()
for position,constraint in enumerate(constraints):
str_kind = constraint.WhichOneof('constraint')
dict_constraint = {
'constraint_uuid': get_uuid_random(),
'position' : position,
'kind' : ConstraintKindEnum._member_map_.get(str_kind.upper()), # pylint: disable=no-member
'data' : grpc_message_to_json_string(getattr(constraint, str_kind, {})),
}
if service_uuid is not None: dict_constraint['service_uuid'] = service_uuid
if slice_uuid is not None: dict_constraint['slice_uuid' ] = slice_uuid
dict_constraints.append(dict_constraint)
return dict_constraints
def upsert_constraints(
session : Session, constraints : List[Dict],
service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> None:
stmt = delete(ConstraintModel)
if service_uuid is not None: stmt = stmt.where(ConstraintModel.service_uuid == service_uuid)
if slice_uuid is not None: stmt = stmt.where(ConstraintModel.slice_uuid == slice_uuid )
session.execute(stmt)
if len(constraints) > 0:
session.execute(insert(ConstraintModel).values(constraints))
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int
# ) -> Tuple[Union_ConstraintModel, bool]:
# with self.session() as session:
#
# grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint'))
#
# parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind)
# if parser is None:
# raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format(
# grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint)))
#
# # create specific constraint
# constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint)
# str_constraint_id = str(uuid.uuid4())
# LOGGER.info('str_constraint_id: {}'.format(str_constraint_id))
# # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id]))
# # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':')
#
# # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object(
# # database, constraint_class, str_constraint_key, constraint_data)
# constraint_data[constraint_class.main_pk_name()] = str_constraint_id
# db_new_constraint = constraint_class(**constraint_data)
# result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
# db_specific_constraint, updated = result
#
# # create generic constraint
# # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value)
# constraint_data = {
# 'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind
# }
#
# db_new_constraint = ConstraintModel(**constraint_data)
# result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
# db_constraint, updated = result
#
# return db_constraint, updated
#
# def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints
# ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]:
# with self.session() as session:
# # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':')
# # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key)
# result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
# created = None
# if result:
# created = True
# session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
# db_constraints = ConstraintsModel(constraints_uuid=service_uuid)
# session.add(db_constraints)
#
# db_objects = [(db_constraints, created)]
#
# for position,grpc_constraint in enumerate(grpc_constraints):
# result : Tuple[ConstraintModel, bool] = self.set_constraint(
# db_constraints, grpc_constraint, position)
# db_constraint, updated = result
# db_objects.append((db_constraint, updated))
#
# return db_objects