Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import Flask, request
from flask_restful import Api, Resource
from flask_socketio import Namespace, SocketIO
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
LOGGER = logging.getLogger(__name__)
def log_request(response):
timestamp = time.strftime('[%Y-%b-%d %H:%M]')
LOGGER.info(
'%s %s %s %s %s', timestamp, request.remote_addr, request.method,
request.full_path, response.status
)
return response
class NbiApplication:
def __init__(self, base_url : Optional[str] = None) -> None:
if base_url is None: base_url = ''
self.base_url = base_url
self._app.after_request(log_request)
self._api = Api(self._app, prefix=base_url)
self._sio_client_manager = socketio.KafkaManager(
url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()),
channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value
)
self._sio = SocketIO(
self._app, cors_allowed_origins='*', async_mode='eventlet',
def add_rest_api_resource(self, resource_class : Resource, *urls, **kwargs) -> None:
self._api.add_resource(resource_class, *urls, **kwargs)
def add_rest_api_resources(self, resources : List[Tuple[Resource, str, str]]) -> None:
for endpoint_name, resource_class, resource_url in resources:
self.add_rest_api_resource(resource_class, resource_url, endpoint=endpoint_name)
def add_websocket_namespace(self, namespace : Namespace) -> None:
self._sio.on_namespace(namespace)
self, event : str, *args : Any, namespace : str = '/', to : Optional[str] = None
self._sio.emit(event, *args, namespace=namespace, to=to)
def get_flask_app(self) -> Flask:
return self._app
def get_flask_api(self) -> Api:
return self._api
def get_socketio_server(self) -> Optional[socketio.Server]:
return self._sio.server
LOGGER.debug('Configured REST-API Resources:')
for resource in self._api.resources:
LOGGER.debug('Configured Flask Rules:')
for rule in self._app.url_map.iter_rules():
LOGGER.debug('Configured SocketIO/WebSocket Namespaces:')
for handler in self._sio.handlers:
LOGGER.debug(' - {:s}'.format(str(handler)))
LOGGER.debug(' - {:s}'.format(str(namespace)))
for namespace in self._sio.server.handlers:
LOGGER.debug(' - {:s}'.format(str(namespace)))
for namespace in self._sio.server.namespace_handlers:
LOGGER.debug(' - {:s}'.format(str(namespace)))