# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import logging from typing import Dict, List, Optional from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from common.method_wrappers.ServiceExceptions import NotFoundException from common.message_broker.MessageBroker import MessageBroker from common.proto.qkd_app_pb2 import AppList, App, AppId from qkd_app.service.database.uuids._Builder import get_uuid_from_string, get_uuid_random from common.method_wrappers.ServiceExceptions import InvalidArgumentsException from common.tools.object_factory.QKDApp import json_app_id from common.tools.object_factory.Context import json_context_id from .models.QKDAppModel import AppModel from .uuids.QKDApp import app_get_uuid from context.service.database.uuids.Context import context_get_uuid from .models.enums.QKDAppStatus import grpc_to_enum__qkd_app_status from .models.enums.QKDAppTypes import grpc_to_enum__qkd_app_types LOGGER = logging.getLogger(__name__) def app_list_objs(db_engine: Engine, context_uuid: str = None) -> AppList: """ Fetches a list of all QKD applications from the database. Optionally filters by context UUID. :param db_engine: SQLAlchemy Engine for DB connection :param context_uuid: UUID of the context to filter by (optional) :return: AppList containing all apps """ def callback(session: Session) -> List[Dict]: query = session.query(AppModel) if context_uuid: query = query.filter_by(context_uuid=context_uuid) return [obj.dump() for obj in query.all()] apps = run_transaction(sessionmaker(bind=db_engine), callback) return AppList(apps=apps) def app_get(db_engine: Engine, request: AppId) -> App: """ Fetches a specific app by its UUID. :param db_engine: SQLAlchemy Engine for DB connection :param request: AppId protobuf containing app ID and context ID :return: App protobuf object :raises NotFoundException: If the app is not found in the database """ app_uuid = app_get_uuid(request, allow_random=False) def callback(session: Session) -> Optional[Dict]: obj = session.query(AppModel).filter_by(app_uuid=app_uuid).one_or_none() return obj.dump() if obj else None obj = run_transaction(sessionmaker(bind=db_engine), callback) if not obj: raise NotFoundException('App', request.app_uuid.uuid, extra_details=[ f'app_uuid generated was: {app_uuid}' ]) return App(**obj) def app_set(db_engine: Engine, messagebroker: MessageBroker, request: App) -> AppId: """ Creates or updates an app in the database. If the app already exists, updates the app. Otherwise, inserts a new entry. :param db_engine: SQLAlchemy Engine for DB connection :param messagebroker: MessageBroker instance for notifications :param request: App protobuf object containing app data :return: AppId protobuf object representing the newly created or updated app """ context_uuid = context_get_uuid(request.app_id.context_id, allow_random=False) app_uuid = app_get_uuid(request.app_id, allow_random=True) # Prepare app data for insertion/update app_data = { 'context_uuid': context_uuid, 'app_uuid': app_uuid, 'app_status': grpc_to_enum__qkd_app_status(request.app_status), 'app_type': grpc_to_enum__qkd_app_types(request.app_type), 'server_app_id': request.server_app_id, 'client_app_id': request.client_app_id, 'backing_qkdl_uuid': [qkdl.qkdl_uuid.uuid for qkdl in request.backing_qkdl_id], 'local_device_uuid': request.local_device_id.device_uuid.uuid, 'remote_device_uuid': request.remote_device_id.device_uuid.uuid if request.remote_device_id.device_uuid.uuid else None, 'created_at': datetime.datetime.utcnow(), 'updated_at': datetime.datetime.utcnow(), } def callback(session: Session) -> bool: # Create the insert statement stmt = insert(AppModel).values(app_data) # Apply the conflict resolution stmt = stmt.on_conflict_do_update( index_elements=[AppModel.app_uuid], set_=dict( app_status=stmt.excluded.app_status, app_type=stmt.excluded.app_type, server_app_id=stmt.excluded.server_app_id, client_app_id=stmt.excluded.client_app_id, backing_qkdl_uuid=stmt.excluded.backing_qkdl_uuid, local_device_uuid=stmt.excluded.local_device_uuid, remote_device_uuid=stmt.excluded.remote_device_uuid, updated_at=stmt.excluded.updated_at ) ) session.execute(stmt) return True run_transaction(sessionmaker(bind=db_engine), callback) app_id = json_app_id(app_uuid, context_id=json_context_id(context_uuid)) return AppId(**app_id) def app_get_by_server(db_engine: Engine, server_app_id: str) -> App: """ Fetches an app by its server_app_id. """ def callback(session: Session) -> Optional[Dict]: obj = session.query(AppModel).filter_by(server_app_id=server_app_id).one_or_none() return obj.dump() if obj else None obj = run_transaction(sessionmaker(bind=db_engine), callback) if not obj: raise NotFoundException('App', server_app_id) return App(**obj) def app_delete(db_engine: Engine, app_uuid: str) -> None: """ Deletes an app by its UUID from the database. :param db_engine: SQLAlchemy Engine for DB connection :param app_uuid: The UUID of the app to be deleted """ def callback(session: Session) -> bool: app_obj = session.query(AppModel).filter_by(app_uuid=app_uuid).one_or_none() if app_obj is None: raise NotFoundException('App', app_uuid) session.delete(app_obj) return True run_transaction(sessionmaker(bind=db_engine), callback)