# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging, sqlalchemy #from typing import Iterator, Optional from common.message_broker.MessageBroker import MessageBroker import grpc, json, logging #, deepdiff from common.proto.context_pb2 import ( Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId) from common.proto.qkd_app_pb2 import (App, AppId, AppList, QKDAppTypesEnum) from common.proto.qkd_app_pb2_grpc import AppServiceServicer from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method #from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain #from common.tools.grpc.ConfigRules import copy_config_rules #from common.tools.grpc.Constraints import copy_constraints #from common.tools.grpc.EndPointIds import copy_endpoint_ids #from common.tools.grpc.ServiceIds import update_service_ids #from common.tools.grpc.Tools import grpc_message_to_json_string #from context.client.ContextClient import ContextClient #from qkd_app.client.QKDAppClient import QKDAppClient from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server from common.method_wrappers.ServiceExceptions import NotFoundException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('QkdApp', 'RPC') # Optare: This file must be edited based on app's logic class AppServiceServicerImpl(AppServiceServicer): def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker): LOGGER.debug('Creating Servicer...') self.db_engine = db_engine self.messagebroker = messagebroker LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RegisterApp(self, request : App, context : grpc.ServicerContext) -> Empty: # Optare: This is the main function required for the project. # Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too # Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL: app_set(self.db_engine, self.messagebroker, request) else: try: app = app_get_by_server(self.db_engine, request.server_app_id) except NotFoundException: app = request app_set(self.db_engine, self.messagebroker, app) else: app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid app_set(self.db_engine, self.messagebroker, app) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListApps(self, request: ContextId, context : grpc.ServicerContext) -> AppList: return app_list_objs(self.db_engine)