# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from flask import Flask, request, session from flask_healthz import healthz, HealthError from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient def get_working_context() -> str: if 'context_uuid' in session: return session['context_uuid'] else: return 'Not selected' def liveness(): pass def readiness(): try: # this component is ready when it is able to connect with the other components it depends on context_client = ContextClient() context_client.connect() context_client.close() device_client = DeviceClient() device_client.connect() device_client.close() except Exception as e: raise HealthError('Can\'t connect with the service: ' + e.details()) def from_json(json_str): return json.loads(json_str) class SetSubAppMiddleware(): def __init__(self, app, web_app_root): self.app = app self.web_app_root = web_app_root def __call__(self, environ, start_response): environ['SCRIPT_NAME'] = self.web_app_root environ['APPLICATION_ROOT'] = self.web_app_root return self.app(environ, start_response) def create_app(use_config=None, web_app_root=None): app = Flask(__name__) if use_config: app.config.from_mapping(**use_config) app.config.update(HEALTHZ={ 'live': liveness, 'ready': readiness }) app.register_blueprint(healthz, url_prefix='/healthz') from webui.service.js.routes import js app.register_blueprint(js) from webui.service.main.routes import main app.register_blueprint(main) from webui.service.service.routes import service app.register_blueprint(service) from webui.service.device.routes import device app.register_blueprint(device) from webui.service.link.routes import link app.register_blueprint(link) app.jinja_env.filters['from_json'] = from_json app.jinja_env.globals.update(get_working_context=get_working_context) if web_app_root is not None: app.wsgi_app = SetSubAppMiddleware(app.wsgi_app, web_app_root) return app