# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import ContextId, Service, ServiceId, ServiceIdList, ServiceList from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Service import json_service_id from context.service.database.ConfigRule import compose_config_rules_data, upsert_config_rules from context.service.database.Constraint import compose_constraints_data, upsert_constraints from .models.enums.ServiceStatus import grpc_to_enum__service_status from .models.enums.ServiceType import grpc_to_enum__service_type from .models.RelationModels import ServiceEndPointModel from .models.ServiceModel import ServiceModel from .uuids.Context import context_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid def service_list_ids(db_engine : Engine, request : ContextId) -> ServiceIdList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.service)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump_id() for obj in obj_list] return ServiceIdList(service_ids=run_transaction(sessionmaker(bind=db_engine), callback)) def service_list_objs(db_engine : Engine, request : ContextId) -> ServiceList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all() #.options(selectinload(ContextModel.service)).filter_by(context_uuid=context_uuid).one_or_none() return [obj.dump() for obj in obj_list] return ServiceList(services=run_transaction(sessionmaker(bind=db_engine), callback)) def service_get(db_engine : Engine, request : ServiceId) -> Service: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ServiceModel] = session.query(ServiceModel).filter_by(service_uuid=service_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) if obj is None: context_uuid = context_get_uuid(request.context_id, allow_random=False) raw_service_uuid = '{:s}/{:s}'.format(request.context_id.context_uuid.uuid, request.service_uuid.uuid) raise NotFoundException('Service', raw_service_uuid, extra_details=[ 'context_uuid generated was: {:s}'.format(context_uuid), 'service_uuid generated was: {:s}'.format(service_uuid), ]) return Service(**obj) def service_set(db_engine : Engine, request : Service) -> Tuple[ServiceId, bool]: raw_context_uuid = request.service_id.context_id.context_uuid.uuid raw_service_uuid = request.service_id.service_uuid.uuid raw_service_name = request.name service_name = raw_service_uuid if len(raw_service_name) == 0 else raw_service_name context_uuid,service_uuid = service_get_uuid(request.service_id, service_name=service_name, allow_random=True) service_type = grpc_to_enum__service_type(request.service_type) service_status = grpc_to_enum__service_status(request.service_status.service_status) service_endpoints_data : List[Dict] = list() for i,endpoint_id in enumerate(request.service_endpoint_ids): endpoint_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid if len(endpoint_context_uuid) == 0: endpoint_context_uuid = context_uuid if endpoint_context_uuid not in {raw_context_uuid, context_uuid}: raise InvalidArgumentException( 'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i), endpoint_context_uuid, ['should be == request.service_id.context_id.context_uuid.uuid({:s})'.format(raw_context_uuid)]) _, _, endpoint_uuid = endpoint_get_uuid(endpoint_id, allow_random=False) service_endpoints_data.append({ 'service_uuid' : service_uuid, 'endpoint_uuid': endpoint_uuid, }) constraints = compose_constraints_data(request.service_constraints, service_uuid=service_uuid) config_rules = compose_config_rules_data(request.service_config.config_rules, service_uuid=service_uuid) service_data = [{ 'context_uuid' : context_uuid, 'service_uuid' : service_uuid, 'service_name' : service_name, 'service_type' : service_type, 'service_status': service_status, }] def callback(session : Session) -> None: stmt = insert(ServiceModel).values(service_data) stmt = stmt.on_conflict_do_update( index_elements=[ServiceModel.service_uuid], set_=dict( service_name = stmt.excluded.service_name, service_type = stmt.excluded.service_type, service_status = stmt.excluded.service_status, ) ) session.execute(stmt) stmt = insert(ServiceEndPointModel).values(service_endpoints_data) stmt = stmt.on_conflict_do_nothing( index_elements=[ServiceEndPointModel.service_uuid, ServiceEndPointModel.endpoint_uuid] ) session.execute(stmt) upsert_constraints(session, constraints, service_uuid=service_uuid) upsert_config_rules(session, config_rules, service_uuid=service_uuid) run_transaction(sessionmaker(bind=db_engine), callback) updated = False # TODO: improve and check if created/updated return ServiceId(**json_service_id(service_uuid, json_context_id(context_uuid))),updated def service_delete(db_engine : Engine, request : ServiceId) -> bool: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(ServiceModel).filter_by(service_uuid=service_uuid).delete() return num_deleted > 0 return run_transaction(sessionmaker(bind=db_engine), callback)