Commit e156b3d3 authored by Alberto Gonzalez Barneo's avatar Alberto Gonzalez Barneo Committed by Alberto Gonzalez Barneo
Browse files

Added folder app with some compoaspects gisters apps app with new component app register for qkd

parent 6c4ef63c
Loading
Loading
Loading
Loading
+59 −0
Original line number Diff line number Diff line
import grpc, logging, sqlalchemy
from typing import Iterator, Optional
from common.message_broker.MessageBroker import MessageBroker
import grpc, json, logging #, deepdiff
from common.proto.context_pb2 import (
    Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId)
from common.proto.app_pb2 import (App, AppId, AppList, QKDAppTypesEnum)
from common.proto.app_pb2_grpc import AppServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain
from common.tools.grpc.ConfigRules import copy_config_rules
from common.tools.grpc.Constraints import copy_constraints
from common.tools.grpc.EndPointIds import copy_endpoint_ids
from common.tools.grpc.ServiceIds import update_service_ids
#from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from app.client.AppClient import AppClient
from .database.App import app_set, app_list_objs, app_get, app_get_by_server
from common.method_wrappers.ServiceExceptions import NotFoundException

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('App', 'RPC')

# Optare: This file must be edited based on app's logic

class AppServiceServicerImpl(AppServiceServicer):
    def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker):
        LOGGER.debug('Creating Servicer...')
        self.db_engine = db_engine
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RegisterApp(self, request : App, context : grpc.ServicerContext) -> Empty:
        # Optare: This is the main function required for the project.
        # Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too
        # Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException

        if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
            app_set(self.db_engine, self.messagebroker, request)

        else:
            try:
                app = app_get_by_server(self.db_engine, request.server_app_id)
            except NotFoundException:
                app = request
                app_set(self.db_engine, self.messagebroker, app)
            else:
                app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
                app_set(self.db_engine, self.messagebroker, app)
                
        
        return Empty()
    

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListApps(self, request: ContextId, context : grpc.ServicerContext) -> AppList:
        return app_list_objs(self.db_engine)

src/app/__init__.py

0 → 100644
+13 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

src/app/__main__.py

0 → 100644
+80 −0
Original line number Diff line number Diff line
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
    wait_for_environment_variables)
from .AppService import AppService
from .rest_server.RestServer import RestServer
from .rest_server.qkd_app import register_qkd_app
from common.message_broker.Factory import get_messagebroker_backend
from common.message_broker.MessageBroker import MessageBroker
from .database.Engine import Engine
from .database.models._Base import rebuild_database

terminate = threading.Event()
LOGGER : logging.Logger = None

def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
    LOGGER.warning('Terminate signal received')
    terminate.set()

def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
        get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     ),
        get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
    ])

    signal.signal(signal.SIGINT,  signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    LOGGER.info('Starting...')

    # Start metrics server
    metrics_port = get_metrics_port()
    start_http_server(metrics_port)

    # Get Database Engine instance and initialize database, if needed
    LOGGER.info('Getting SQLAlchemy DB Engine...')
    db_engine = Engine.get_engine()
    if db_engine is None:
        LOGGER.error('Unable to get SQLAlchemy DB Engine...')
        return -1

    try:
        Engine.create_database(db_engine)
    except: # pylint: disable=bare-except # pragma: no cover
        LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))

    rebuild_database(db_engine)

    # Get message broker instance
    messagebroker = None #MessageBroker(get_messagebroker_backend())

    # Starting context service
    grpc_service = AppService(db_engine, messagebroker)
    grpc_service.start()

    rest_server = RestServer()
    register_qkd_app(rest_server)
    rest_server.start()

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=1.0): pass

    LOGGER.info('Terminating...')
    grpc_service.stop()
    rest_server.shutdown()
    rest_server.join()

    LOGGER.info('Bye')
    return 0

if __name__ == '__main__':
    sys.exit(main())
+50 −0
Original line number Diff line number Diff line
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, ContextId
from common.proto.app_pb2 import App, AppId, AppList
from common.proto.app_pb2_grpc import AppServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string

LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')

class AppClient:
    def __init__(self, host=None, port=None):
        if not host: host = get_service_host(ServiceNameEnum.APP)
        if not port: port = get_service_port_grpc(ServiceNameEnum.APP)
        self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
        LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
        self.channel = None
        self.stub = None
        self.connect()
        LOGGER.debug('Channel created')

    def connect(self):
        self.channel = grpc.insecure_channel(self.endpoint)
        self.stub = AppServiceStub(self.channel)

    def close(self):
        if self.channel is not None: self.channel.close()
        self.channel = None
        self.stub = None



    @RETRY_DECORATOR
    def RegisterApp(self, request : App) -> Empty:
        LOGGER.debug('RegisterApp request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.RegisterApp(request)
        LOGGER.debug('RegisterApp result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    
    @RETRY_DECORATOR
    def ListApps(self, request: ContextId) -> AppList:
        LOGGER.debug('ListApps request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.ListApps(request)
        LOGGER.debug('ListApps result: {:s}'.format(grpc_message_to_json_string(response)))
        return response
+13 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading