# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from sqlalchemy import delete from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session from typing import Dict, List, Optional from common.proto.context_pb2 import Constraint from common.tools.grpc.Tools import grpc_message_to_json_string from .models.ConstraintModel import ConstraintKindEnum, ConstraintModel from .uuids._Builder import get_uuid_random def compose_constraints_data( constraints : List[Constraint], service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None ) -> List[Dict]: dict_constraints : List[Dict] = list() for position,constraint in enumerate(constraints): str_kind = constraint.WhichOneof('constraint') dict_constraint = { 'constraint_uuid': get_uuid_random(), 'position' : position, 'kind' : ConstraintKindEnum._member_map_.get(str_kind.upper()), # pylint: disable=no-member 'data' : grpc_message_to_json_string(getattr(constraint, str_kind, {})), } if service_uuid is not None: dict_constraint['service_uuid'] = service_uuid if slice_uuid is not None: dict_constraint['slice_uuid' ] = slice_uuid dict_constraints.append(dict_constraint) return dict_constraints def upsert_constraints( session : Session, constraints : List[Dict], service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None ) -> None: stmt = delete(ConstraintModel) if service_uuid is not None: stmt = stmt.where(ConstraintModel.service_uuid == service_uuid) if slice_uuid is not None: stmt = stmt.where(ConstraintModel.slice_uuid == slice_uuid ) session.execute(stmt) if len(constraints) > 0: session.execute(insert(ConstraintModel).values(constraints)) # def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int # ) -> Tuple[Union_ConstraintModel, bool]: # with self.session() as session: # # grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint')) # # parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind) # if parser is None: # raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format( # grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint))) # # # create specific constraint # constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint) # str_constraint_id = str(uuid.uuid4()) # LOGGER.info('str_constraint_id: {}'.format(str_constraint_id)) # # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id])) # # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':') # # # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object( # # database, constraint_class, str_constraint_key, constraint_data) # constraint_data[constraint_class.main_pk_name()] = str_constraint_id # db_new_constraint = constraint_class(**constraint_data) # result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint) # db_specific_constraint, updated = result # # # create generic constraint # # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value) # constraint_data = { # 'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind # } # # db_new_constraint = ConstraintModel(**constraint_data) # result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint) # db_constraint, updated = result # # return db_constraint, updated # # def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints # ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]: # with self.session() as session: # # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':') # # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key) # result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none() # created = None # if result: # created = True # session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none() # db_constraints = ConstraintsModel(constraints_uuid=service_uuid) # session.add(db_constraints) # # db_objects = [(db_constraints, created)] # # for position,grpc_constraint in enumerate(grpc_constraints): # result : Tuple[ConstraintModel, bool] = self.set_constraint( # db_constraints, grpc_constraint, position) # db_constraint, updated = result # db_objects.append((db_constraint, updated)) # # return db_objects