Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Alberto Gonzalez Barneo
committed
import grpc
import logging
import sqlalchemy
import uuid
Alberto Gonzalez Barneo
committed
from common.message_broker.MessageBroker import MessageBroker
Alberto Gonzalez Barneo
committed
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList, QKDAppTypesEnum, QoS
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
Alberto Gonzalez Barneo
committed
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
Alberto Gonzalez Barneo
committed
from common.tools.grpc.Tools import grpc_message_to_json_string
Alberto Gonzalez Barneo
committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Alberto Gonzalez Barneo
committed
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server, app_delete
Alberto Gonzalez Barneo
committed
LOGGER = logging.getLogger(__name__)
Alberto Gonzalez Barneo
committed
class AppServiceServicerImpl(AppServiceServicer):
Alberto Gonzalez Barneo
committed
def __init__(self, db_engine: sqlalchemy.engine.Engine, messagebroker: MessageBroker):
LOGGER.debug('Initializing AppServiceServicer...')
Alberto Gonzalez Barneo
committed
self.db_engine = db_engine
self.messagebroker = messagebroker
Alberto Gonzalez Barneo
committed
LOGGER.debug('AppServiceServicer initialized')
Alberto Gonzalez Barneo
committed
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Alberto Gonzalez Barneo
committed
def RegisterApp(self, request: App, context: grpc.ServicerContext) -> Empty:
"""
Registers an app in the system, handling both internal and external applications
with ETSI GS QKD 015 compliance.
"""
LOGGER.debug(f"Received RegisterApp request: {grpc_message_to_json_string(request)}")
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
try:
# Validate QoS parameters as per ETSI 015 requirements
self._validate_qos(request.qos)
Alberto Gonzalez Barneo
committed
Alberto Gonzalez Barneo
committed
# Check if an app with the same server_app_id and local_device_id already exists
existing_app = self._check_existing_app(request.server_app_id, request.local_device_id.device_uuid.uuid)
if existing_app:
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_CLIENT:
LOGGER.debug(f"Handling external app registration for server_app_id: {request.server_app_id}")
# Handle second-party registration for external apps
if not existing_app.remote_device_id.device_uuid.uuid:
existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, existing_app)
LOGGER.debug(f"Updated external app with server_app_id: {request.server_app_id}, remote_device_id: {request.local_device_id.device_uuid.uuid}")
else:
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
context.set_details(f"App with server_app_id {request.server_app_id} already has both parties registered.")
return Empty()
else:
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
context.set_details(f"App with server_app_id {request.server_app_id} already exists.")
return Empty()
Alberto Gonzalez Barneo
committed
else:
Alberto Gonzalez Barneo
committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# Assign application IDs as required
self._validate_and_assign_app_ids(request)
# Register the app
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
LOGGER.debug(f"Registering internal app with app_uuid: {request.app_id.app_uuid.uuid}")
app_set(self.db_engine, self.messagebroker, request)
else:
self._register_external_app(request)
LOGGER.debug(f"RegisterApp completed successfully for app: {request.server_app_id}")
return Empty()
except InvalidArgumentException as e:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(str(e))
raise e
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("An internal error occurred during app registration.")
raise e
def _validate_qos(self, qos: QoS) -> None:
"""
Validates the QoS parameters for the application, ensuring ETSI 015 compliance.
"""
if qos.max_bandwidth and qos.min_bandwidth and qos.max_bandwidth < qos.min_bandwidth:
raise InvalidArgumentException("QoS max_bandwidth cannot be less than min_bandwidth.")
if qos.ttl and qos.ttl <= 0:
raise InvalidArgumentException("QoS TTL must be a positive value.")
LOGGER.debug(f"QoS validated: {qos}")
def _check_existing_app(self, server_app_id: str, local_device_id: str):
try:
return app_get_by_server(self.db_engine, server_app_id)
except NotFoundException:
return None
def _validate_and_assign_app_ids(self, request: App) -> None:
"""
Validates and assigns app IDs (app_uuid, server_app_id, client_app_id) if not provided.
"""
if not request.app_id.app_uuid.uuid:
request.app_id.app_uuid.uuid = str(uuid.uuid4())
LOGGER.debug(f"Assigned new app_uuid: {request.app_id.app_uuid.uuid}")
if not request.server_app_id:
request.server_app_id = str(uuid.uuid4())
LOGGER.debug(f"Assigned new server_app_id: {request.server_app_id}")
del request.client_app_id[:] # Clear the repeated field for clients
def _register_external_app(self, request: App) -> None:
try:
existing_app = app_get_by_server(self.db_engine, request.server_app_id)
if not existing_app.remote_device_id.device_uuid.uuid:
existing_app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, existing_app)
else:
LOGGER.debug(f"App with server_app_id: {request.server_app_id} already has both parties registered.")
except NotFoundException:
app_set(self.db_engine, self.messagebroker, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListApps(self, request: ContextId, context: grpc.ServicerContext) -> AppList:
"""
Lists all apps in the system, including their statistics and QoS attributes.
"""
LOGGER.debug(f"Received ListApps request: {grpc_message_to_json_string(request)}")
try:
apps = app_list_objs(self.db_engine, request.context_uuid.uuid)
for app in apps.apps:
LOGGER.debug(f"App retrieved: {grpc_message_to_json_string(app)}")
LOGGER.debug(f"ListApps returned {len(apps.apps)} apps for context_id: {request.context_uuid.uuid}")
return apps
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("An internal error occurred while listing apps.")
raise e
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetApp(self, request: AppId, context: grpc.ServicerContext) -> App:
"""
Fetches details of a specific app based on its AppId, including QoS and performance stats.
"""
LOGGER.debug(f"Received GetApp request: {grpc_message_to_json_string(request)}")
try:
app = app_get(self.db_engine, request)
LOGGER.debug(f"GetApp found app with app_uuid: {request.app_uuid.uuid}")
return app
except NotFoundException as e:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"App not found: {e}")
raise e
Alberto Gonzalez Barneo
committed
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Alberto Gonzalez Barneo
committed
def DeleteApp(self, request: AppId, context: grpc.ServicerContext) -> Empty:
"""
Deletes an app from the system by its AppId, following ETSI compliance.
"""
LOGGER.debug(f"Received DeleteApp request for app_uuid: {request.app_uuid.uuid}")
try:
app_delete(self.db_engine, request.app_uuid.uuid)
LOGGER.debug(f"App with UUID {request.app_uuid.uuid} deleted successfully.")
return Empty()
except NotFoundException as e:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"App not found: {e}")
raise e