Skip to content
Snippets Groups Projects
Constraint.py 6.28 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import datetime, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy import delete
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from typing import Dict, List, Optional
from common.proto.context_pb2 import Constraint
from common.tools.grpc.Tools import grpc_message_to_json_string
from .models.ConstraintModel import ConstraintKindEnum, ConstraintModel
from .uuids._Builder import get_uuid_random

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def compose_constraints_data(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    constraints : List[Constraint], now : datetime.datetime,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
) -> List[Dict]:
    dict_constraints : List[Dict] = list()
    for position,constraint in enumerate(constraints):
        str_kind = constraint.WhichOneof('constraint')
        dict_constraint = {
            'constraint_uuid': get_uuid_random(),
            'position'       : position,
            'kind'           : ConstraintKindEnum._member_map_.get(str_kind.upper()), # pylint: disable=no-member
            'data'           : grpc_message_to_json_string(getattr(constraint, str_kind, {})),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'created_at'     : now,
            'updated_at'     : now,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        }
        if service_uuid is not None: dict_constraint['service_uuid'] = service_uuid
        if slice_uuid   is not None: dict_constraint['slice_uuid'  ] = slice_uuid
        dict_constraints.append(dict_constraint)
    return dict_constraints

def upsert_constraints(
    session : Session, constraints : List[Dict],
    service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
) -> List[bool]:
    # TODO: do not delete all constraints; just add-remove as needed
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    stmt = delete(ConstraintModel)
    if service_uuid is not None: stmt = stmt.where(ConstraintModel.service_uuid == service_uuid)
    if slice_uuid   is not None: stmt = stmt.where(ConstraintModel.slice_uuid   == slice_uuid  )
    session.execute(stmt)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    constraint_updates = []
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(constraints) > 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        stmt = insert(ConstraintModel).values(constraints)
        #stmt = stmt.on_conflict_do_update(
        #    index_elements=[ConstraintModel.configrule_uuid],
        #    set_=dict(
        #        updated_at = stmt.excluded.updated_at,
        #    )
        #)
        stmt = stmt.returning(ConstraintModel.created_at, ConstraintModel.updated_at)
        constraint_updates = session.execute(stmt).fetchall()

    return constraint_updates

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

#    def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int
#    ) -> Tuple[Union_ConstraintModel, bool]:
#        with self.session() as session:
#
#            grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint'))
#
#            parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind)
#            if parser is None:
#                raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format(
#                    grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint)))
#
#            # create specific constraint
#            constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint)
#            str_constraint_id = str(uuid.uuid4())
#            LOGGER.info('str_constraint_id: {}'.format(str_constraint_id))
#            # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id]))
#            # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':')
#
#            # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object(
#            #     database, constraint_class, str_constraint_key, constraint_data)
#            constraint_data[constraint_class.main_pk_name()] = str_constraint_id
#            db_new_constraint = constraint_class(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_specific_constraint, updated = result
#
#            # create generic constraint
#            # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value)
#            constraint_data = {
#                'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind
#            }
#
#            db_new_constraint = ConstraintModel(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_constraint, updated = result
#
#            return db_constraint, updated
#
#    def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints
#    ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]:
#        with self.session() as session:
#            # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':')
#            # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key)
#            result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            created = None
#            if result:
#                created = True
#            session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            db_constraints = ConstraintsModel(constraints_uuid=service_uuid)
#            session.add(db_constraints)
#
#            db_objects = [(db_constraints, created)]
#
#            for position,grpc_constraint in enumerate(grpc_constraints):
#                result : Tuple[ConstraintModel, bool] = self.set_constraint(
#                    db_constraints, grpc_constraint, position)
#                db_constraint, updated = result
#                db_objects.append((db_constraint, updated))
#
#            return db_objects