Skip to content
Snippets Groups Projects
QKDAppServiceServicerImpl.py 8.69 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc
import logging
import sqlalchemy
import uuid
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList, QKDAppTypesEnum, QoS
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server, app_delete
METRICS_POOL = MetricsPool('QkdApp', 'RPC')
    def __init__(self, db_engine: sqlalchemy.engine.Engine, messagebroker: MessageBroker):
        LOGGER.debug('Initializing AppServiceServicer...')
        self.db_engine = db_engine
        self.messagebroker = messagebroker
        LOGGER.debug('AppServiceServicer initialized')
    def RegisterApp(self, request: App, context: grpc.ServicerContext) -> Empty:
        """
        Registers an app in the system, handling both internal and external applications
        with ETSI GS QKD 015 compliance.
        """
        LOGGER.debug(f"Received RegisterApp request: {grpc_message_to_json_string(request)}")
        try:
            # Validate QoS parameters as per ETSI 015 requirements
            self._validate_qos(request.qos)
            # Check if an app with the same server_app_id and local_device_id already exists
            existing_app = self._check_existing_app(request.server_app_id, request.local_device_id.device_uuid.uuid)

            if existing_app:
                if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_CLIENT:
                    LOGGER.debug(f"Handling external app registration for server_app_id: {request.server_app_id}")
                    # Handle second-party registration for external apps
                    if not existing_app.remote_device_id.device_uuid.uuid:
                        existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                        app_set(self.db_engine, self.messagebroker, existing_app)
                        LOGGER.debug(f"Updated external app with server_app_id: {request.server_app_id}, remote_device_id: {request.local_device_id.device_uuid.uuid}")
                    else:
                        context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                        context.set_details(f"App with server_app_id {request.server_app_id} already has both parties registered.")
                        return Empty()
                else:
                    context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                    context.set_details(f"App with server_app_id {request.server_app_id} already exists.")
                    return Empty()
                # Assign application IDs as required
                self._validate_and_assign_app_ids(request)

                # Register the app
                if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
                    LOGGER.debug(f"Registering internal app with app_uuid: {request.app_id.app_uuid.uuid}")
                    app_set(self.db_engine, self.messagebroker, request)
                else:
                    self._register_external_app(request)

            LOGGER.debug(f"RegisterApp completed successfully for app: {request.server_app_id}")
            return Empty()

        except InvalidArgumentException as e:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details(str(e))
            raise e
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("An internal error occurred during app registration.")
            raise e

    def _validate_qos(self, qos: QoS) -> None:
        """
        Validates the QoS parameters for the application, ensuring ETSI 015 compliance.
        """
        if qos.max_bandwidth and qos.min_bandwidth and qos.max_bandwidth < qos.min_bandwidth:
            raise InvalidArgumentException("QoS max_bandwidth cannot be less than min_bandwidth.")

        if qos.ttl and qos.ttl <= 0:
            raise InvalidArgumentException("QoS TTL must be a positive value.")

        LOGGER.debug(f"QoS validated: {qos}")

    def _check_existing_app(self, server_app_id: str, local_device_id: str):
        try:
            return app_get_by_server(self.db_engine, server_app_id)
        except NotFoundException:
            return None

    def _validate_and_assign_app_ids(self, request: App) -> None:
        """
        Validates and assigns app IDs (app_uuid, server_app_id, client_app_id) if not provided.
        """
        if not request.app_id.app_uuid.uuid:
            request.app_id.app_uuid.uuid = str(uuid.uuid4())
            LOGGER.debug(f"Assigned new app_uuid: {request.app_id.app_uuid.uuid}")

        if not request.server_app_id:
            request.server_app_id = str(uuid.uuid4())
            LOGGER.debug(f"Assigned new server_app_id: {request.server_app_id}")

        del request.client_app_id[:]  # Clear the repeated field for clients

    def _register_external_app(self, request: App) -> None:
        try:
            existing_app = app_get_by_server(self.db_engine, request.server_app_id)

            if not existing_app.remote_device_id.device_uuid.uuid:
                existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                app_set(self.db_engine, self.messagebroker, existing_app)
            else:
                LOGGER.debug(f"App with server_app_id: {request.server_app_id} already has both parties registered.")
        except NotFoundException:
            app_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListApps(self, request: ContextId, context: grpc.ServicerContext) -> AppList:
        """
        Lists all apps in the system, including their statistics and QoS attributes.
        """
        LOGGER.debug(f"Received ListApps request: {grpc_message_to_json_string(request)}")

        try:
            apps = app_list_objs(self.db_engine, request.context_uuid.uuid)
            for app in apps.apps:
                LOGGER.debug(f"App retrieved: {grpc_message_to_json_string(app)}")

            LOGGER.debug(f"ListApps returned {len(apps.apps)} apps for context_id: {request.context_uuid.uuid}")
            return apps
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("An internal error occurred while listing apps.")
            raise e

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetApp(self, request: AppId, context: grpc.ServicerContext) -> App:
        """
        Fetches details of a specific app based on its AppId, including QoS and performance stats.
        """
        LOGGER.debug(f"Received GetApp request: {grpc_message_to_json_string(request)}")
        try:
            app = app_get(self.db_engine, request)
            LOGGER.debug(f"GetApp found app with app_uuid: {request.app_uuid.uuid}")
            return app
        except NotFoundException as e:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"App not found: {e}")
            raise e
    def DeleteApp(self, request: AppId, context: grpc.ServicerContext) -> Empty:
        """
        Deletes an app from the system by its AppId, following ETSI compliance.
        """
        LOGGER.debug(f"Received DeleteApp request for app_uuid: {request.app_uuid.uuid}")
        try:
            app_delete(self.db_engine, request.app_uuid.uuid)
            LOGGER.debug(f"App with UUID {request.app_uuid.uuid} deleted successfully.")
            return Empty()
        except NotFoundException as e:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"App not found: {e}")
            raise e