Skip to content
Snippets Groups Projects
NbiApplication.py 3.68 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, socketio, time
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Any, List, Optional, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from flask import Flask, request
from flask_restful import Api, Resource
from flask_socketio import Namespace, SocketIO
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from nbi.Config import SECRET_KEY
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed


LOGGER = logging.getLogger(__name__)

def log_request(response):
    timestamp = time.strftime('[%Y-%b-%d %H:%M]')
    LOGGER.info(
        '%s %s %s %s %s', timestamp, request.remote_addr, request.method,
        request.full_path, response.status
    )
    return response

class NbiApplication:
    def __init__(self, base_url : Optional[str] = None) -> None:
        if base_url is None: base_url = ''
        self.base_url = base_url

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._app = Flask(__name__)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._app.config['SECRET_KEY'] = SECRET_KEY
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._app.after_request(log_request)
        self._api = Api(self._app, prefix=base_url)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._sio_client_manager = socketio.KafkaManager(
            url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()),
            channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value
        )
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._sio = SocketIO(
            self._app, cors_allowed_origins='*', async_mode='eventlet',
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            client_manager=self._sio_client_manager,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            logger=True, engineio_logger=True
        )
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def add_rest_api_resource(self, resource_class : Resource, *urls, **kwargs) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._api.add_resource(resource_class, *urls, **kwargs)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def add_rest_api_resources(self, resources : List[Tuple[Resource, str, str]]) -> None:
        for endpoint_name, resource_class, resource_url in resources:
            self.add_rest_api_resource(resource_class, resource_url, endpoint=endpoint_name)

    def add_websocket_namespace(self, namespace : Namespace) -> None:
        self._sio.on_namespace(namespace)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def websocket_emit_message(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self, event : str, *args : Any, namespace : str = '/', to : Optional[str] = None
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._sio.emit(event, *args, namespace=namespace, to=to)

    def get_flask_app(self) -> Flask:
        return self._app
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def get_flask_api(self) -> Api:
        return self._api

    def get_socketio_server(self) -> Optional[socketio.Server]:
        return self._sio.server

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dump_configuration(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Configured REST-API Resources:')
        for resource in self._api.resources:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug(' - {:s}'.format(str(resource)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Configured Flask Rules:')
        for rule in self._app.url_map.iter_rules():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug(' - {:s}'.format(str(rule)))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Configured SocketIO/WebSocket Namespaces:')
        for handler in self._sio.handlers:
            LOGGER.debug(' - {:s}'.format(str(handler)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for namespace in self._sio.namespace_handlers:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug(' - {:s}'.format(str(namespace)))
        for namespace in self._sio.server.handlers:
            LOGGER.debug(' - {:s}'.format(str(namespace)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for namespace in self._sio.server.namespace_handlers:
            LOGGER.debug(' - {:s}'.format(str(namespace)))