-
Lluis Gifre Renom authoredLluis Gifre Renom authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
QKDAppServiceServicerImpl.py 8.69 KiB
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import logging
import sqlalchemy
import uuid
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList, QKDAppTypesEnum, QoS
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server, app_delete
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('QkdApp', 'RPC')
class AppServiceServicerImpl(AppServiceServicer):
def __init__(self, db_engine: sqlalchemy.engine.Engine, messagebroker: MessageBroker):
LOGGER.debug('Initializing AppServiceServicer...')
self.db_engine = db_engine
self.messagebroker = messagebroker
LOGGER.debug('AppServiceServicer initialized')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RegisterApp(self, request: App, context: grpc.ServicerContext) -> Empty:
"""
Registers an app in the system, handling both internal and external applications
with ETSI GS QKD 015 compliance.
"""
LOGGER.debug(f"Received RegisterApp request: {grpc_message_to_json_string(request)}")
try:
# Validate QoS parameters as per ETSI 015 requirements
self._validate_qos(request.qos)
# Check if an app with the same server_app_id and local_device_id already exists
existing_app = self._check_existing_app(request.server_app_id, request.local_device_id.device_uuid.uuid)
if existing_app:
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_CLIENT:
LOGGER.debug(f"Handling external app registration for server_app_id: {request.server_app_id}")
# Handle second-party registration for external apps
if not existing_app.remote_device_id.device_uuid.uuid:
existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, existing_app)
LOGGER.debug(f"Updated external app with server_app_id: {request.server_app_id}, remote_device_id: {request.local_device_id.device_uuid.uuid}")
else:
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
context.set_details(f"App with server_app_id {request.server_app_id} already has both parties registered.")
return Empty()
else:
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
context.set_details(f"App with server_app_id {request.server_app_id} already exists.")
return Empty()
else:
# Assign application IDs as required
self._validate_and_assign_app_ids(request)
# Register the app
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
LOGGER.debug(f"Registering internal app with app_uuid: {request.app_id.app_uuid.uuid}")
app_set(self.db_engine, self.messagebroker, request)
else:
self._register_external_app(request)
LOGGER.debug(f"RegisterApp completed successfully for app: {request.server_app_id}")
return Empty()
except InvalidArgumentException as e:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(str(e))
raise e
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("An internal error occurred during app registration.")
raise e
def _validate_qos(self, qos: QoS) -> None:
"""
Validates the QoS parameters for the application, ensuring ETSI 015 compliance.
"""
if qos.max_bandwidth and qos.min_bandwidth and qos.max_bandwidth < qos.min_bandwidth:
raise InvalidArgumentException("QoS max_bandwidth cannot be less than min_bandwidth.")
if qos.ttl and qos.ttl <= 0:
raise InvalidArgumentException("QoS TTL must be a positive value.")
LOGGER.debug(f"QoS validated: {qos}")
def _check_existing_app(self, server_app_id: str, local_device_id: str):
try:
return app_get_by_server(self.db_engine, server_app_id)
except NotFoundException:
return None
def _validate_and_assign_app_ids(self, request: App) -> None:
"""
Validates and assigns app IDs (app_uuid, server_app_id, client_app_id) if not provided.
"""
if not request.app_id.app_uuid.uuid:
request.app_id.app_uuid.uuid = str(uuid.uuid4())
LOGGER.debug(f"Assigned new app_uuid: {request.app_id.app_uuid.uuid}")
if not request.server_app_id:
request.server_app_id = str(uuid.uuid4())
LOGGER.debug(f"Assigned new server_app_id: {request.server_app_id}")
del request.client_app_id[:] # Clear the repeated field for clients
def _register_external_app(self, request: App) -> None:
try:
existing_app = app_get_by_server(self.db_engine, request.server_app_id)
if not existing_app.remote_device_id.device_uuid.uuid:
existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, existing_app)
else:
LOGGER.debug(f"App with server_app_id: {request.server_app_id} already has both parties registered.")
except NotFoundException:
app_set(self.db_engine, self.messagebroker, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListApps(self, request: ContextId, context: grpc.ServicerContext) -> AppList:
"""
Lists all apps in the system, including their statistics and QoS attributes.
"""
LOGGER.debug(f"Received ListApps request: {grpc_message_to_json_string(request)}")
try:
apps = app_list_objs(self.db_engine, request.context_uuid.uuid)
for app in apps.apps:
LOGGER.debug(f"App retrieved: {grpc_message_to_json_string(app)}")
LOGGER.debug(f"ListApps returned {len(apps.apps)} apps for context_id: {request.context_uuid.uuid}")
return apps
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("An internal error occurred while listing apps.")
raise e
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetApp(self, request: AppId, context: grpc.ServicerContext) -> App:
"""
Fetches details of a specific app based on its AppId, including QoS and performance stats.
"""
LOGGER.debug(f"Received GetApp request: {grpc_message_to_json_string(request)}")
try:
app = app_get(self.db_engine, request)
LOGGER.debug(f"GetApp found app with app_uuid: {request.app_uuid.uuid}")
return app
except NotFoundException as e:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"App not found: {e}")
raise e
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteApp(self, request: AppId, context: grpc.ServicerContext) -> Empty:
"""
Deletes an app from the system by its AppId, following ETSI compliance.
"""
LOGGER.debug(f"Received DeleteApp request for app_uuid: {request.app_uuid.uuid}")
try:
app_delete(self.db_engine, request.app_uuid.uuid)
LOGGER.debug(f"App with UUID {request.app_uuid.uuid} deleted successfully.")
return Empty()
except NotFoundException as e:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"App not found: {e}")
raise e