Skip to content
__init__.py 3.06 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask_healthz import healthz, HealthError
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient

def get_working_context() -> str:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return session['context_uuid'] if 'context_uuid' in session else '---'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def get_working_topology() -> str:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return session['topology_uuid'] if 'topology_uuid' in session else '---'

def liveness():
    pass

def readiness():
    try:  # this component is ready when it is able to connect with the other components it depends on
        context_client = ContextClient()
        context_client.connect()
        context_client.close()
        device_client = DeviceClient()
        device_client.connect()
        device_client.close()
    except Exception as e:
        raise HealthError('Can\'t connect with the service: ' + e.details())

def from_json(json_str):
    return json.loads(json_str)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class SetSubAppMiddleware():
    def __init__(self, app, web_app_root):
        self.app = app
        self.web_app_root = web_app_root
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __call__(self, environ, start_response):
        environ['SCRIPT_NAME'] = self.web_app_root
        environ['APPLICATION_ROOT'] = self.web_app_root
        return self.app(environ, start_response)

def create_app(use_config=None, web_app_root=None):
    app = Flask(__name__)
    if use_config:
        app.config.from_mapping(**use_config)
    
    app.config.update(HEALTHZ={
        'live': liveness,
        'ready': readiness
    })
    
    app.register_blueprint(healthz, url_prefix='/healthz')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    from webui.service.js.routes import js
    app.register_blueprint(js)

    from webui.service.main.routes import main
    app.register_blueprint(main)

    from webui.service.load_gen.routes import load_gen
    app.register_blueprint(load_gen)

    from webui.service.service.routes import service
    app.register_blueprint(service)

longllu's avatar
longllu committed
    from webui.service.slice.routes import slice
    app.register_blueprint(slice)

    from webui.service.device.routes import device
    app.register_blueprint(device)

    from webui.service.link.routes import link
    app.register_blueprint(link)
longllu's avatar
longllu committed
    

    app.jinja_env.filters['from_json'] = from_json
    
    app.jinja_env.globals.update(get_working_context=get_working_context)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    app.jinja_env.globals.update(get_working_topology=get_working_topology)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if web_app_root is not None:
        app.wsgi_app = SetSubAppMiddleware(app.wsgi_app, web_app_root)
    return app