Newer
Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Alberto Gonzalez Barneo
committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Alberto Gonzalez Barneo
committed
import datetime
import logging
from typing import Dict, List, Optional
Alberto Gonzalez Barneo
committed
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
Alberto Gonzalez Barneo
committed
from sqlalchemy.orm import Session, sessionmaker
Alberto Gonzalez Barneo
committed
from sqlalchemy_cockroachdb import run_transaction
Alberto Gonzalez Barneo
committed
from common.method_wrappers.ServiceExceptions import NotFoundException
Alberto Gonzalez Barneo
committed
from common.message_broker.MessageBroker import MessageBroker
Alberto Gonzalez Barneo
committed
from common.proto.qkd_app_pb2 import AppList, App, AppId
from qkd_app.service.database.uuids._Builder import get_uuid_from_string, get_uuid_random
from common.method_wrappers.ServiceExceptions import InvalidArgumentsException
from common.tools.object_factory.QKDApp import json_app_id
from common.tools.object_factory.Context import json_context_id
Alberto Gonzalez Barneo
committed
from .models.QKDAppModel import AppModel
from .uuids.QKDApp import app_get_uuid
Alberto Gonzalez Barneo
committed
from context.service.database.uuids.Context import context_get_uuid
Alberto Gonzalez Barneo
committed
from .models.enums.QKDAppStatus import grpc_to_enum__qkd_app_status
from .models.enums.QKDAppTypes import grpc_to_enum__qkd_app_types
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
LOGGER = logging.getLogger(__name__)
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
def app_list_objs(db_engine: Engine, context_uuid: str = None) -> AppList:
"""
Fetches a list of all QKD applications from the database. Optionally filters by context UUID.
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
:param db_engine: SQLAlchemy Engine for DB connection
:param context_uuid: UUID of the context to filter by (optional)
:return: AppList containing all apps
"""
def callback(session: Session) -> List[Dict]:
query = session.query(AppModel)
if context_uuid:
query = query.filter_by(context_uuid=context_uuid)
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
return [obj.dump() for obj in query.all()]
Alberto Gonzalez Barneo
committed
apps = run_transaction(sessionmaker(bind=db_engine), callback)
return AppList(apps=apps)
Alberto Gonzalez Barneo
committed
def app_get(db_engine: Engine, request: AppId) -> App:
"""
Fetches a specific app by its UUID.
:param db_engine: SQLAlchemy Engine for DB connection
:param request: AppId protobuf containing app ID and context ID
:return: App protobuf object
:raises NotFoundException: If the app is not found in the database
"""
Alberto Gonzalez Barneo
committed
app_uuid = app_get_uuid(request, allow_random=False)
Alberto Gonzalez Barneo
committed
def callback(session: Session) -> Optional[Dict]:
obj = session.query(AppModel).filter_by(app_uuid=app_uuid).one_or_none()
return obj.dump() if obj else None
Alberto Gonzalez Barneo
committed
obj = run_transaction(sessionmaker(bind=db_engine), callback)
Alberto Gonzalez Barneo
committed
if not obj:
raise NotFoundException('App', request.app_uuid.uuid, extra_details=[
f'app_uuid generated was: {app_uuid}'
Alberto Gonzalez Barneo
committed
])
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
return App(**obj)
Alberto Gonzalez Barneo
committed
def app_set(db_engine: Engine, messagebroker: MessageBroker, request: App) -> AppId:
"""
Creates or updates an app in the database. If the app already exists, updates the app.
Otherwise, inserts a new entry.
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
:param db_engine: SQLAlchemy Engine for DB connection
:param messagebroker: MessageBroker instance for notifications
:param request: App protobuf object containing app data
:return: AppId protobuf object representing the newly created or updated app
"""
context_uuid = context_get_uuid(request.app_id.context_id, allow_random=False)
app_uuid = app_get_uuid(request.app_id, allow_random=True)
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
# Prepare app data for insertion/update
app_data = {
'context_uuid': context_uuid,
'app_uuid': app_uuid,
'app_status': grpc_to_enum__qkd_app_status(request.app_status),
'app_type': grpc_to_enum__qkd_app_types(request.app_type),
'server_app_id': request.server_app_id,
'client_app_id': request.client_app_id,
'backing_qkdl_uuid': [qkdl.qkdl_uuid.uuid for qkdl in request.backing_qkdl_id],
'local_device_uuid': request.local_device_id.device_uuid.uuid,
'remote_device_uuid': request.remote_device_id.device_uuid.uuid if request.remote_device_id.device_uuid.uuid else None,
'created_at': datetime.datetime.utcnow(),
'updated_at': datetime.datetime.utcnow(),
}
def callback(session: Session) -> bool:
# Create the insert statement
Alberto Gonzalez Barneo
committed
stmt = insert(AppModel).values(app_data)
Alberto Gonzalez Barneo
committed
# Apply the conflict resolution
Alberto Gonzalez Barneo
committed
stmt = stmt.on_conflict_do_update(
index_elements=[AppModel.app_uuid],
set_=dict(
Alberto Gonzalez Barneo
committed
app_status=stmt.excluded.app_status,
app_type=stmt.excluded.app_type,
server_app_id=stmt.excluded.server_app_id,
client_app_id=stmt.excluded.client_app_id,
backing_qkdl_uuid=stmt.excluded.backing_qkdl_uuid,
local_device_uuid=stmt.excluded.local_device_uuid,
remote_device_uuid=stmt.excluded.remote_device_uuid,
updated_at=stmt.excluded.updated_at
Alberto Gonzalez Barneo
committed
)
)
Alberto Gonzalez Barneo
committed
session.execute(stmt)
return True
run_transaction(sessionmaker(bind=db_engine), callback)
app_id = json_app_id(app_uuid, context_id=json_context_id(context_uuid))
Alberto Gonzalez Barneo
committed
return AppId(**app_id)
Alberto Gonzalez Barneo
committed
def app_get_by_server(db_engine: Engine, server_app_id: str) -> App:
"""
Fetches an app by its server_app_id.
"""
def callback(session: Session) -> Optional[Dict]:
obj = session.query(AppModel).filter_by(server_app_id=server_app_id).one_or_none()
return obj.dump() if obj else None
Alberto Gonzalez Barneo
committed
obj = run_transaction(sessionmaker(bind=db_engine), callback)
Alberto Gonzalez Barneo
committed
if not obj:
raise NotFoundException('App', server_app_id)
Alberto Gonzalez Barneo
committed
return App(**obj)
Alberto Gonzalez Barneo
committed
def app_delete(db_engine: Engine, app_uuid: str) -> None:
"""
Deletes an app by its UUID from the database.
:param db_engine: SQLAlchemy Engine for DB connection
:param app_uuid: The UUID of the app to be deleted
"""
def callback(session: Session) -> bool:
app_obj = session.query(AppModel).filter_by(app_uuid=app_uuid).one_or_none()
if app_obj is None:
raise NotFoundException('App', app_uuid)
session.delete(app_obj)
return True
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
run_transaction(sessionmaker(bind=db_engine), callback)