Commit 61ab413a authored by Alberto Gonzalez Barneo's avatar Alberto Gonzalez Barneo
Browse files

Added delete service, delete app, detail app, refactor all code and some fix

parent dab05407
Loading
Loading
Loading
Loading
+22 −9
Original line number Diff line number Diff line
@@ -3,9 +3,10 @@ package qkd_app;

import "context.proto";

// Optare: Change this if you want to change App's structure or enums. 
// Optare: If a message (structure) is changed it must be changed in src/app/service/database
// Define Empty message if you don't want to use google.protobuf.Empty.
message Empty {}

// Enum representing possible states of a QKD application.
enum QKDAppStatusEnum {
  QKDAPPSTATUS_ON = 0;
  QKDAPPSTATUS_DISCONNECTED = 1;
@@ -13,16 +14,26 @@ enum QKDAppStatusEnum {
  QKDAPPSTATUS_ZOMBIE = 3;
}

// Enum representing QKD application types.
enum QKDAppTypesEnum {
  QKDAPPTYPES_INTERNAL = 0;
  QKDAPPTYPES_CLIENT = 1;
}

// Message representing a QKDL (Quantum Key Distribution Link) identifier.
message QKDLId {
  context.Uuid qkdl_uuid = 1;
}

// Define QoS parameters for QKD applications
message QoS {
  uint32 max_bandwidth = 1; // Maximum bandwidth (in bits per second)
  uint32 min_bandwidth = 2; // Minimum bandwidth (optional)
  uint32 jitter = 3;        // Maximum jitter (in milliseconds)
  uint32 ttl = 4;           // Time-to-live (in seconds)
}

// Main message representing a QKD application with all required fields.
message App {
  AppId app_id = 1;
  QKDAppStatusEnum app_status = 2;
@@ -32,22 +43,24 @@ message App {
  repeated QKDLId backing_qkdl_id = 6;
  context.DeviceId local_device_id = 7;
  context.DeviceId remote_device_id = 8;
  QoS qos = 9; // Include QoS in the App message
}


// Message representing an identifier for an app.
message AppId {
  context.ContextId context_id = 1;
  context.Uuid app_uuid = 2;
}


// Service definition for AppService, including app registration and listing.
service AppService {
  rpc RegisterApp(App) returns (context.Empty) {}
  rpc ListApps(context.ContextId) returns (AppList) {}
  rpc GetApp(AppId) returns (App) {} 
  rpc DeleteApp (AppId) returns (Empty) {} // Use locally defined Empty
}



// Message representing a list of apps.
message AppList {
  repeated App apps = 1;
}
+65 −22
Original line number Diff line number Diff line
@@ -12,53 +12,96 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
import grpc
import logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList
from common.proto.qkd_app_pb2_grpc import AppServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string

LOGGER = logging.getLogger(__name__)

# Define retry mechanism
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')

class QKDAppClient:
    def __init__(self, host=None, port=None):
        if not host: host = get_service_host(ServiceNameEnum.QKD_APP)
        if not port: port = get_service_port_grpc(ServiceNameEnum.QKD_APP)
        self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
        LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
        self.host = host or get_service_host(ServiceNameEnum.QKD_APP)
        self.port = port or get_service_port_grpc(ServiceNameEnum.QKD_APP)
        self.endpoint = f'{self.host}:{self.port}'
        LOGGER.debug(f'Initializing gRPC client to {self.endpoint}...')
        self.channel = None
        self.stub = None
        self.connect()
        LOGGER.debug('Channel created')

    def connect(self):
        try:
            self.channel = grpc.insecure_channel(self.endpoint)
            self.stub = AppServiceStub(self.channel)
            LOGGER.debug(f'gRPC channel to {self.endpoint} established successfully')
        except Exception as e:
            LOGGER.error(f"Failed to establish gRPC connection: {e}")
            self.stub = None

    def close(self):
        if self.channel is not None: self.channel.close()
        if self.channel:
            self.channel.close()
            LOGGER.debug(f'gRPC channel to {self.endpoint} closed')
        self.channel = None
        self.stub = None

    def check_connection(self):
        if self.stub is None:
            LOGGER.error("gRPC connection is not established. Retrying...")
            self.connect()
            if self.stub is None:
                raise ConnectionError("gRPC connection could not be established.")

    @retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION)
    def RegisterApp(self, app_request: App) -> None:
        """Register a new QKD app."""
        self.check_connection()
        LOGGER.debug(f'RegisterApp request: {grpc_message_to_json_string(app_request)}')
        self.stub.RegisterApp(app_request)
        LOGGER.debug('App registered successfully')

    @RETRY_DECORATOR
    def RegisterApp(self, request : App) -> Empty:
        LOGGER.debug('RegisterApp request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.RegisterApp(request)
        LOGGER.debug('RegisterApp result: {:s}'.format(grpc_message_to_json_string(response)))
        return response
    @retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION)
    def UpdateApp(self, app_request: App) -> None:
        """Update an existing QKD app."""
        self.check_connection()
        LOGGER.debug(f'UpdateApp request: {grpc_message_to_json_string(app_request)}')
        self.stub.UpdateApp(app_request)
        LOGGER.debug('App updated successfully')

    @retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION)
    def ListApps(self, context_id) -> AppList:
        """List all apps for a given context."""
        self.check_connection()
        LOGGER.debug(f'ListApps request for context_id: {grpc_message_to_json_string(context_id)}')
        response = self.stub.ListApps(context_id)
        LOGGER.debug(f'ListApps result: {grpc_message_to_json_string(response)}')
        return response

    @RETRY_DECORATOR
    def ListApps(self, request: ContextId) -> AppList:
        LOGGER.debug('ListApps request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.ListApps(request)
        LOGGER.debug('ListApps result: {:s}'.format(grpc_message_to_json_string(response)))
    @retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION)
    def GetApp(self, app_id: AppId) -> App:
        """Fetch details of a specific app by its ID."""
        self.check_connection()
        LOGGER.debug(f'GetApp request for app_id: {grpc_message_to_json_string(app_id)}')
        response = self.stub.GetApp(app_id)
        LOGGER.debug(f'GetApp result: {grpc_message_to_json_string(response)}')
        return response

    @retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION)
    def DeleteApp(self, app_id: AppId) -> None:
        """Delete an app by its ID."""
        self.check_connection()  # Ensures connection is established
        LOGGER.debug(f'DeleteApp request for app_id: {grpc_message_to_json_string(app_id)}')
        self.stub.DeleteApp(app_id)  # Calls the gRPC service
        LOGGER.debug('App deleted successfully')



+26 −5
Original line number Diff line number Diff line
@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, sqlalchemy
import logging
import sqlalchemy
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
@@ -20,18 +21,38 @@ from common.proto.qkd_app_pb2_grpc import add_AppServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from qkd_app.service.QKDAppServiceServicerImpl import AppServiceServicerImpl

# Custom gRPC settings
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
# Configure maximum number of workers for gRPC
GRPC_MAX_WORKERS = 200  # Adjusted for high concurrency
LOGGER = logging.getLogger(__name__)


class AppService(GenericGrpcService):
    """
    gRPC Service for handling QKD App-related operations. 
    This class initializes the gRPC server and installs the servicers.
    """
    def __init__(
        self, db_engine: sqlalchemy.engine.Engine, messagebroker: MessageBroker, cls_name: str = __name__
    ) -> None:
        """
        Initializes the AppService with the provided database engine and message broker.
        Sets up the gRPC server to handle app-related requests.

        Args:
            db_engine (sqlalchemy.engine.Engine): Database engine for handling app data.
            messagebroker (MessageBroker): Message broker for inter-service communication.
            cls_name (str): Class name for logging purposes (default is __name__).
        """
        # Get the port for the gRPC AppService
        port = get_service_port_grpc(ServiceNameEnum.QKD_APP)
        # Initialize the base class with port and max worker configuration
        super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
        # Initialize the AppServiceServicer with the database and message broker
        self.app_servicer = AppServiceServicerImpl(db_engine, messagebroker)

    def install_servicers(self):
        """
        Installs the AppService servicers to the gRPC server.
        This allows the server to handle requests for QKD app operations.
        """
        add_AppServiceServicer_to_server(self.app_servicer, self.server)
        LOGGER.debug("AppService servicer installed")
+154 −41
Original line number Diff line number Diff line
@@ -12,62 +12,175 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, sqlalchemy
#from typing import Iterator, Optional
import grpc
import logging
import sqlalchemy
import uuid
from common.message_broker.MessageBroker import MessageBroker
import grpc, json, logging #, deepdiff
from common.proto.context_pb2 import (
    Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId)
from common.proto.qkd_app_pb2 import (App, AppId, AppList, QKDAppTypesEnum)
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList, QKDAppTypesEnum, QoS
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
#from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain
#from common.tools.grpc.ConfigRules import copy_config_rules
#from common.tools.grpc.Constraints import copy_constraints
#from common.tools.grpc.EndPointIds import copy_endpoint_ids
#from common.tools.grpc.ServiceIds import update_service_ids
#from common.tools.grpc.Tools import grpc_message_to_json_string
#from context.client.ContextClient import ContextClient
#from qkd_app.client.QKDAppClient import QKDAppClient
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server
from common.method_wrappers.ServiceExceptions import NotFoundException
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server, app_delete

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('QkdApp', 'RPC')

# Optare: This file must be edited based on app's logic

class AppServiceServicerImpl(AppServiceServicer):
    def __init__(self, db_engine: sqlalchemy.engine.Engine, messagebroker: MessageBroker):
        LOGGER.debug('Creating Servicer...')
        LOGGER.debug('Initializing AppServiceServicer...')
        self.db_engine = db_engine
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')
        LOGGER.debug('AppServiceServicer initialized')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RegisterApp(self, request: App, context: grpc.ServicerContext) -> Empty:
        # Optare: This is the main function required for the project.
        # Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too
        # Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException
        """
        Registers an app in the system, handling both internal and external applications
        with ETSI GS QKD 015 compliance.
        """
        LOGGER.debug(f"Received RegisterApp request: {grpc_message_to_json_string(request)}")

        try:
            # Validate QoS parameters as per ETSI 015 requirements
            self._validate_qos(request.qos)

            # Check if an app with the same server_app_id and local_device_id already exists
            existing_app = self._check_existing_app(request.server_app_id, request.local_device_id.device_uuid.uuid)

            if existing_app:
                if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_CLIENT:
                    LOGGER.debug(f"Handling external app registration for server_app_id: {request.server_app_id}")
                    # Handle second-party registration for external apps
                    if not existing_app.remote_device_id.device_uuid.uuid:
                        existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                        app_set(self.db_engine, self.messagebroker, existing_app)
                        LOGGER.debug(f"Updated external app with server_app_id: {request.server_app_id}, remote_device_id: {request.local_device_id.device_uuid.uuid}")
                    else:
                        context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                        context.set_details(f"App with server_app_id {request.server_app_id} already has both parties registered.")
                        return Empty()
                else:
                    context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                    context.set_details(f"App with server_app_id {request.server_app_id} already exists.")
                    return Empty()
            else:
                # Assign application IDs as required
                self._validate_and_assign_app_ids(request)

                # Register the app
                if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
                    LOGGER.debug(f"Registering internal app with app_uuid: {request.app_id.app_uuid.uuid}")
                    app_set(self.db_engine, self.messagebroker, request)

                else:
                    self._register_external_app(request)

            LOGGER.debug(f"RegisterApp completed successfully for app: {request.server_app_id}")
            return Empty()

        except InvalidArgumentException as e:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details(str(e))
            raise e
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("An internal error occurred during app registration.")
            raise e

    def _validate_qos(self, qos: QoS) -> None:
        """
        Validates the QoS parameters for the application, ensuring ETSI 015 compliance.
        """
        if qos.max_bandwidth and qos.min_bandwidth and qos.max_bandwidth < qos.min_bandwidth:
            raise InvalidArgumentException("QoS max_bandwidth cannot be less than min_bandwidth.")

        if qos.ttl and qos.ttl <= 0:
            raise InvalidArgumentException("QoS TTL must be a positive value.")

        LOGGER.debug(f"QoS validated: {qos}")

    def _check_existing_app(self, server_app_id: str, local_device_id: str):
        try:
                app = app_get_by_server(self.db_engine, request.server_app_id)
            return app_get_by_server(self.db_engine, server_app_id)
        except NotFoundException:
                app = request
                app_set(self.db_engine, self.messagebroker, app)
            else:
                app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                app_set(self.db_engine, self.messagebroker, app)
            return None

    def _validate_and_assign_app_ids(self, request: App) -> None:
        """
        Validates and assigns app IDs (app_uuid, server_app_id, client_app_id) if not provided.
        """
        if not request.app_id.app_uuid.uuid:
            request.app_id.app_uuid.uuid = str(uuid.uuid4())
            LOGGER.debug(f"Assigned new app_uuid: {request.app_id.app_uuid.uuid}")

        return Empty()
        if not request.server_app_id:
            request.server_app_id = str(uuid.uuid4())
            LOGGER.debug(f"Assigned new server_app_id: {request.server_app_id}")

        del request.client_app_id[:]  # Clear the repeated field for clients

    def _register_external_app(self, request: App) -> None:
        try:
            existing_app = app_get_by_server(self.db_engine, request.server_app_id)

            if not existing_app.remote_device_id.device_uuid.uuid:
                existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                app_set(self.db_engine, self.messagebroker, existing_app)
            else:
                LOGGER.debug(f"App with server_app_id: {request.server_app_id} already has both parties registered.")
        except NotFoundException:
            app_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListApps(self, request: ContextId, context: grpc.ServicerContext) -> AppList:
        return app_list_objs(self.db_engine)
        """
        Lists all apps in the system, including their statistics and QoS attributes.
        """
        LOGGER.debug(f"Received ListApps request: {grpc_message_to_json_string(request)}")

        try:
            apps = app_list_objs(self.db_engine, request.context_uuid.uuid)
            for app in apps.apps:
                LOGGER.debug(f"App retrieved: {grpc_message_to_json_string(app)}")

            LOGGER.debug(f"ListApps returned {len(apps.apps)} apps for context_id: {request.context_uuid.uuid}")
            return apps
        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("An internal error occurred while listing apps.")
            raise e

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetApp(self, request: AppId, context: grpc.ServicerContext) -> App:
        """
        Fetches details of a specific app based on its AppId, including QoS and performance stats.
        """
        LOGGER.debug(f"Received GetApp request: {grpc_message_to_json_string(request)}")
        try:
            app = app_get(self.db_engine, request)
            LOGGER.debug(f"GetApp found app with app_uuid: {request.app_uuid.uuid}")
            return app
        except NotFoundException as e:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"App not found: {e}")
            raise e

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteApp(self, request: AppId, context: grpc.ServicerContext) -> Empty:
        """
        Deletes an app from the system by its AppId, following ETSI compliance.
        """
        LOGGER.debug(f"Received DeleteApp request for app_uuid: {request.app_uuid.uuid}")
        try:
            app_delete(self.db_engine, request.app_uuid.uuid)
            LOGGER.debug(f"App with UUID {request.app_uuid.uuid} deleted successfully.")
            return Empty()
        except NotFoundException as e:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"App not found: {e}")
            raise e

+38 −18
Original line number Diff line number Diff line
@@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, signal, sys, threading
import logging
import signal
import sys
import threading
from prometheus_client import start_http_server
#from common.Constants import ServiceNameEnum
from common.Settings import (
@@ -26,68 +29,85 @@ from qkd_app.service.rest_server.qkd_app import register_qkd_app
from qkd_app.service.database.Engine import Engine
from qkd_app.service.database.models._Base import rebuild_database

# Event for terminating the service gracefully
terminate = threading.Event()
LOGGER: logging.Logger = None

def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
    LOGGER.warning('Terminate signal received')
def signal_handler(signum, frame):
    """
    Handle termination signals like SIGINT and SIGTERM to ensure graceful shutdown.
    """
    LOGGER.warning('Termination signal received')
    terminate.set()

def main():
    global LOGGER # pylint: disable=global-statement
    global LOGGER  # Required due to global scope

    # Set up logging
    log_level = get_log_level()
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    # Ensure necessary environment variables are set
    wait_for_environment_variables([
        #get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     ),
        #get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
    ])

    # Register signal handlers for graceful shutdown
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    LOGGER.info('Starting...')

    # Start metrics server
    # Start Prometheus metrics server
    metrics_port = get_metrics_port()
    start_http_server(metrics_port)
    LOGGER.info(f'Metrics server started on port {metrics_port}')

    # Get Database Engine instance and initialize database, if needed
    # Initialize the SQLAlchemy database engine
    LOGGER.info('Getting SQLAlchemy DB Engine...')
    db_engine = Engine.get_engine()
    if db_engine is None:
        LOGGER.error('Unable to get SQLAlchemy DB Engine...')
        LOGGER.error('Unable to get SQLAlchemy DB Engine. Exiting...')
        return -1

    # Try creating the database or log any issues
    try:
        Engine.create_database(db_engine)
    except: # pylint: disable=bare-except # pragma: no cover
        LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))
    except Exception as e:  # More specific exception handling
        LOGGER.exception(f'Failed to check/create the database: {db_engine.url}. Error: {str(e)}')
        return -1

    # Rebuild the database schema if necessary
    rebuild_database(db_engine)

    # Get message broker instance
    messagebroker = None #MessageBroker(get_messagebroker_backend())
    # Initialize the message broker (if needed)
    messagebroker = None  # Disabled until further notice, can be re-enabled when necessary
    # messagebroker = MessageBroker(get_messagebroker_backend())

    # Starting context service
    # Start the gRPC App Service
    grpc_service = AppService(db_engine, messagebroker)
    grpc_service.start()

    # Start the REST server and register QKD apps
    rest_server = RestServer()
    register_qkd_app(rest_server)
    rest_server.start()

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=1.0): pass
    LOGGER.info('Services started. Waiting for termination signal...')

    # Keep the process running until a termination signal is received
    while not terminate.wait(timeout=1.0):
        pass

    LOGGER.info('Terminating...')
    # Shutdown services gracefully on termination
    LOGGER.info('Terminating services...')
    grpc_service.stop()
    rest_server.shutdown()
    rest_server.join()

    LOGGER.info('Bye')
    LOGGER.info('Shutdown complete. Exiting...')
    return 0

if __name__ == '__main__':
Loading