diff --git a/run_test_nbi.sh b/run_test_nbi.sh new file mode 100755 index 0000000000000000000000000000000000000000..b34f4c9564c0fd2c87a2fb5fa7db4b68fde98ffe --- /dev/null +++ b/run_test_nbi.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +export PYTHON_PATH=./src +export LOG_LEVEL=DEBUG +export FLASK_ENV=development + +python -m pytest --log-level=DEBUG -o log_cli=true --verbose src/nbi/mytest/pytest_code.py diff --git a/src/nbi/Dockerfile b/src/nbi/Dockerfile index bd6ce77c72fed46495ff38698e38ddbaed4ac477..c56dff12b978ddf21d053242d96dd663af72e686 100644 --- a/src/nbi/Dockerfile +++ b/src/nbi/Dockerfile @@ -89,5 +89,5 @@ RUN mkdir -p /var/teraflow/tests/tools COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ # Start the service -#ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "eventlet", "-b", "0.0.0.0:8080", "nbi.service:nbi_app"] -ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "-b", "0.0.0.0:8080", "nbi.service.app:app"] +ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "eventlet", "-b", "0.0.0.0:8080", "nbi.service.app:app"] +#ENTRYPOINT ["gunicorn", "-w", "4", "--worker-class", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "-b", "0.0.0.0:8080", "nbi.service.app:app"] diff --git a/src/nbi/mytest/pytest_code.py b/src/nbi/mytest/pytest_code.py new file mode 100644 index 0000000000000000000000000000000000000000..be0376b8810806931ede2ed6ed5a6b030e6b423f --- /dev/null +++ b/src/nbi/mytest/pytest_code.py @@ -0,0 +1,105 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet, eventlet.wsgi, json, logging, os, pytest, requests, threading, time +import websockets.sync.client # Import synchronous WebSocket client +from nbi.service.NbiApplication import NbiApplication +from nbi.service.rest_server.nbi_plugins import register_restconf +from nbi.service.restapi_resources.health_probes import register_health_probes +from nbi.service.websocket_namespaces.hearthbeat import register_heartbeat + + + +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +LOCAL_HOST = '127.0.0.1' +NBI_SERVICE_PORT = 18080 +NBI_SERVICE_PREFIX_URL = '' +NBI_SERVICE_BASE_URL = '{:s}:{:d}{:s}'.format(LOCAL_HOST, NBI_SERVICE_PORT, NBI_SERVICE_PREFIX_URL) + +class ServerThread(threading.Thread): + def __init__(self): + super().__init__(daemon=True) + + self.nbi_app = NbiApplication(base_url=NBI_SERVICE_PREFIX_URL) + register_health_probes(self.nbi_app) + register_heartbeat (self.nbi_app) + register_restconf (self.nbi_app) + self.nbi_app.dump_configuration() + + def run(self): + try: + #eventlet.wsgi.server( + # eventlet.listen((LOCAL_HOST, NBI_SERVICE_PORT)), + # self.nbi_app.get_flask_app(), + # debug=True, log_output=True + #) + #thread = eventlet.spawn( + # self.nbi_app._sio.run, self.nbi_app.get_flask_app(), + # host=LOCAL_HOST, port=NBI_SERVICE_PORT, + # debug=True, use_reloader=False + #) + #thread.wait() + self.nbi_app._sio.run( + self.nbi_app.get_flask_app(), + host=LOCAL_HOST, port=NBI_SERVICE_PORT, + debug=True, use_reloader=False + ) + except: + LOGGER.exception('unhandled') + +@pytest.fixture(scope='session') +def nbi_application() -> NbiApplication: + thread = ServerThread() + thread.start() + time.sleep(1) + yield thread.nbi_app + thread.join(timeout=1) + +def test_restapi_get_healthz( + nbi_application : NbiApplication # pylint: disable=redefined-outer-name, unused-argument +) -> None: + request_url = 'http://' + NBI_SERVICE_BASE_URL + '/healthz' + LOGGER.warning('Request: GET {:s}'.format(str(request_url))) + reply = requests.request('get', request_url, timeout=10, allow_redirects=True) + LOGGER.warning('Reply: {:s}'.format(str(reply.text))) + assert reply.status_code == requests.codes['OK'], 'Reply failed with status code {:d}'.format(reply.status_code) + if reply.content and len(reply.content) > 0: return reply.json() + +def test_websocket_get_heartbeat( + nbi_application : NbiApplication # pylint: disable=redefined-outer-name, unused-argument +) -> None: + nbi_application.dump_configuration() + request_url = 'ws://' + NBI_SERVICE_BASE_URL + '/heartbeat' + LOGGER.warning('Request: WS {:s}'.format(str(request_url))) + + heartbeat_count = 0 + with websockets.sync.client.connect(request_url) as ws: + while heartbeat_count < 5: + message = ws.recv() + LOGGER.warning('Received message: {:s}'.format(str(message))) + + data = json.loads(message) + + # Validate uptime response + assert "uptime_seconds" in data, "Missing 'uptime_seconds' in response" + assert isinstance(data["uptime_seconds"], (int, float)), "'uptime_seconds' is not a number" + + heartbeat_count += 1 + LOGGER.warning('--> Heartbeat #{:d}: {:s}'.format(heartbeat_count, str(data))) + + LOGGER.warning('Test completed') + raise Exception() diff --git a/src/nbi/mytest/requirements.in b/src/nbi/mytest/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..6a27a260ff7c6b03e6964c72f0c7b0e43882c33e --- /dev/null +++ b/src/nbi/mytest/requirements.in @@ -0,0 +1,20 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +eventlet==0.39.0 +Flask==2.1.3 +Flask-HTTPAuth==4.5.0 +Flask-RESTful==0.3.9 +flask-socketio==5.5.1 +gunicorn==23.0.0 diff --git a/src/nbi/requirements.in b/src/nbi/requirements.in index 8a4d1a1b93092180f4fa8237f79c447c69d49cbd..401a6de3026f4ab5896f21224e7674435553e080 100644 --- a/src/nbi/requirements.in +++ b/src/nbi/requirements.in @@ -20,9 +20,9 @@ Flask-HTTPAuth==4.5.0 Flask-RESTful==0.3.9 flask-socketio==5.5.1 jsonschema==4.4.0 -gevent==24.11.1 -gevent-websocket==0.10.1 -greenlet==3.1.1 +#gevent==24.11.1 +#gevent-websocket==0.10.1 +#greenlet==3.1.1 gunicorn==23.0.0 libyang==2.8.4 netaddr==0.9.0 diff --git a/src/nbi/service/NbiApplication.py b/src/nbi/service/NbiApplication.py index 9d17b88312e3cf9636e952d5a6484f3e6d8e2ac0..17d0507ecccb166a82697a92e8f9945edc74c755 100644 --- a/src/nbi/service/NbiApplication.py +++ b/src/nbi/service/NbiApplication.py @@ -36,16 +36,28 @@ class NbiApplication: self.base_url = base_url self._app = Flask(__name__) + self._app.config['SECRET_KEY'] = 'secret!' self._app.after_request(log_request) self._api = Api(self._app, prefix=base_url) #websocket_path = '/'.join([base_url.rstrip('/'), 'websocket']) - self._sio = SocketIO(self._app, path=base_url, cors_allowed_origins="*") + #self._sio = SocketIO(self._app, path=base_url, cors_allowed_origins="*", logger=True, engineio_logger=True) + self._sio = SocketIO(self._app, cors_allowed_origins="*", logger=True, engineio_logger=True) + + @self._sio.on_error_default # handles all namespaces without an explicit error handler + def default_error_handler(e): + LOGGER.error('[default_error_handler] e={:s}'.format(str(e))) def add_rest_api_resource(self, resource_class : Resource, *urls, **kwargs) -> None: self._api.add_resource(resource_class, *urls, **kwargs) def add_websocket_namespace(self, namespace_class : Namespace, namespace_url : str) -> None: - self._sio.on_namespace(namespace_class(namespace=namespace_url)) + LOGGER.warning('[add_websocket_namespace] (before) self._sio.server={:s}'.format(str(self._sio.server))) + LOGGER.warning('[add_websocket_namespace] (before) self._sio.server.namespace_handlers={:s}'.format(str(self._sio.server.namespace_handlers))) + LOGGER.warning('[add_websocket_namespace] (before) self._sio.namespace_handlers={:s}'.format(str(self._sio.namespace_handlers))) + self._sio.on_namespace(namespace_class(namespace_url)) + LOGGER.warning('[add_websocket_namespace] (after) self._sio.server={:s}'.format(str(self._sio.server))) + LOGGER.warning('[add_websocket_namespace] (after) self._sio.server.namespace_handlers={:s}'.format(str(self._sio.server.namespace_handlers))) + LOGGER.warning('[add_websocket_namespace] (after) self._sio.namespace_handlers={:s}'.format(str(self._sio.namespace_handlers))) def websocket_emit_message( self, event : str, *args : Any, namespace : str = "/", to : Optional[str] = None @@ -66,10 +78,14 @@ class NbiApplication: LOGGER.debug('Configured WebSocket Namespaces:') for namespace in self._sio.server.handlers.keys(): - LOGGER.debug(' - {:s}'.format(str(namespace))) + LOGGER.debug(' (server) - {:s}'.format(str(namespace))) + + for namespace in self._sio.namespace_handlers: + LOGGER.debug(' (ns_hdls) - {:s}'.format(str(namespace))) def run_standalone( - self, bind_address : Optional[str] = None, bind_port : Optional[int] = None + self, bind_address : Optional[str] = None, bind_port : Optional[int] = None, + debug : bool = False, use_reloader : bool = False ) -> None: # Run method used when started in a standalone mode, i.e., outside gunicorn or # similar WSGI HTTP servers. Otherwise, use mechanism defined by the used @@ -82,4 +98,24 @@ class NbiApplication: endpoint = '/'.join([endpoint.rstrip('/'), self.base_url]) LOGGER.info('Listening on {:s}...'.format(endpoint)) - self._sio.run(self._app, host=bind_address, port=bind_port) + self._sio.run( + self._app, host=bind_address, port=bind_port, + debug=debug, use_reloader=use_reloader + ) + + def start_test_thread( + self, bind_address : Optional[str] = None, bind_port : Optional[int] = None, + debug : bool = False, use_reloader : bool = False + ) -> None: + # NOTE: To be used for testing purposes with pytest + # Stop the thread through nbi_app.stop_test_thread() + self._thread = self._sio.start_background_task( + self._sio.run, self._app, host=bind_address, port=bind_port, + debug=debug, use_reloader=use_reloader + ) + + def stop_test_thread(self): + # NOTE: To be used for testing purposes with pytest + # Start the thread through nbi_app.start_test_thread(...) + if self._thread is None: return + self._thread.join() diff --git a/src/nbi/service/rest_server/nbi_plugins/__init__.py b/src/nbi/service/rest_server/nbi_plugins/__init__.py index 9b5d7920db49bab6d456aba186e6500142aad5b2..c5fce5bd389e1d84640049f8f521c6ae53892e5b 100644 --- a/src/nbi/service/rest_server/nbi_plugins/__init__.py +++ b/src/nbi/service/rest_server/nbi_plugins/__init__.py @@ -14,25 +14,21 @@ from flask.json import jsonify from flask_restful import Resource - -from nbi.service.rest_server.RestServer import RestServer - -from .tools.HttpStatusCodes import HTTP_CREATED +from nbi.service.NbiApplication import NbiApplication +from .tools.HttpStatusCodes import HTTP_CREATED, HTTP_OK URL_PREFIX = "/restconf/data" - class BaseServer(Resource): def post(self): response = jsonify({}) response.status_code = HTTP_CREATED return response + def get(self): + response = jsonify({}) + response.status_code = HTTP_OK + return response -def _add_resource(rest_server: RestServer, resource: Resource, *urls, **kwargs): - urls = [(URL_PREFIX + url) for url in urls] - rest_server.add_resource(resource, *urls, **kwargs) - - -def register_restconf(rest_server: RestServer): - _add_resource(rest_server, BaseServer, "") +def register_restconf(nbi_app : NbiApplication): + nbi_app.add_rest_api_resource(BaseServer, URL_PREFIX) diff --git a/src/nbi/service/restapi_resources/health_probes/Resources.py b/src/nbi/service/restapi_resources/health_probes/Resources.py new file mode 100644 index 0000000000000000000000000000000000000000..27c00d6bc6ccf46916b804ddca8ed6c8d79d8c8a --- /dev/null +++ b/src/nbi/service/restapi_resources/health_probes/Resources.py @@ -0,0 +1,15 @@ +import time +from flask_restful import Resource + +START_TIME = time.time() + +class HealthProbe(Resource): + def get(self): + uptime = time.time() - START_TIME + return {'status': 'ready', 'uptime': uptime}, 200 + #return {'status': 'not ready'}, 503 + +RESOURCE_DESCRIPTORS = [ + # endpoint_name, resource_class, resource_url + ('sys.probe.healthz', HealthProbe, '/healthz'), +] diff --git a/src/nbi/service/restapi_resources/health_probes/__init__.py b/src/nbi/service/restapi_resources/health_probes/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c1251e769570e44dd3e52b3f0629f9aa800f8904 --- /dev/null +++ b/src/nbi/service/restapi_resources/health_probes/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from nbi.service.NbiApplication import NbiApplication +from .Resources import RESOURCE_DESCRIPTORS + +def register_health_probes(nbi_app : NbiApplication): + for endpoint_name, resource_class, resource_url in RESOURCE_DESCRIPTORS: + nbi_app.add_rest_api_resource(resource_class, resource_url, endpoint=endpoint_name) diff --git a/src/nbi/service/websocket_namespaces/hearthbeat/HeartbeatNamespace.py b/src/nbi/service/websocket_namespaces/hearthbeat/HeartbeatNamespace.py index 06a9569c7441e4e4ea25e8da2f64cd5393133afb..21cf43a6f1e8f54b9c286e618630d9e6b91284f3 100644 --- a/src/nbi/service/websocket_namespaces/hearthbeat/HeartbeatNamespace.py +++ b/src/nbi/service/websocket_namespaces/hearthbeat/HeartbeatNamespace.py @@ -24,7 +24,7 @@ NAMESPACE_NAME = 'heartbeat' NAMESPACE_URL = '/heartbeat' # WebSocket Heartbeat Namespace for debugging purposes -class DebugHeartbeatHandler(Namespace): +class HeartbeatHandler(Namespace): def on_connect(self): LOGGER.debug('Client {:s} connected'.format(str(request.sid))) join_room(NAMESPACE_NAME) @@ -35,22 +35,24 @@ class DebugHeartbeatHandler(Namespace): )) leave_room(NAMESPACE_NAME) -class DebugHeartbeatThread(threading.Thread): +class HeartbeatThread(threading.Thread): INTERVAL = 1 # second def __init__(self, nbi_app : NbiApplication): super().__init__(daemon=True) self.nbi_app = nbi_app + self.nbi_app.heartbeat_thread = self def run(self): - interval = DebugHeartbeatThread.INTERVAL + interval = HeartbeatThread.INTERVAL start_time = time.time() while True: time.sleep(interval) uptime = time.time() - start_time + LOGGER.warning('emitting...') self.nbi_app.websocket_emit_message( 'uptime', {'uptime_seconds': uptime}, namespace=NAMESPACE_URL, to=NAMESPACE_NAME ) -NAMESPACE_DESCRIPTOR = (NAMESPACE_NAME, DebugHeartbeatHandler, NAMESPACE_URL) +NAMESPACE_DESCRIPTOR = (NAMESPACE_NAME, HeartbeatHandler, NAMESPACE_URL) diff --git a/src/nbi/service/websocket_namespaces/hearthbeat/__init__.py b/src/nbi/service/websocket_namespaces/hearthbeat/__init__.py index 435a425b4848813a450f00eaa746b9a3e29b6609..842023ba1703c38c9cc6f22cbd6cc94d30472920 100644 --- a/src/nbi/service/websocket_namespaces/hearthbeat/__init__.py +++ b/src/nbi/service/websocket_namespaces/hearthbeat/__init__.py @@ -14,8 +14,11 @@ from nbi.service.NbiApplication import NbiApplication -from .HeartbeatNamespace import NAMESPACE_DESCRIPTOR +from .HeartbeatNamespace import NAMESPACE_DESCRIPTOR, HeartbeatThread def register_heartbeat(nbi_app : NbiApplication): + heartbeat_thread = HeartbeatThread(nbi_app) + heartbeat_thread.start() + _, namespace_class, namespace_url = NAMESPACE_DESCRIPTOR nbi_app.add_websocket_namespace(namespace_class, namespace_url) diff --git a/src/nbi/service/websocket_namespaces/hearthbeat/example_code.txt b/src/nbi/service/websocket_namespaces/hearthbeat/example_code.txt new file mode 100644 index 0000000000000000000000000000000000000000..2384841d30610f187f2620fd7b490cc39f26f39f --- /dev/null +++ b/src/nbi/service/websocket_namespaces/hearthbeat/example_code.txt @@ -0,0 +1,33 @@ +thread_event = Event() + +# ... + +@socketio.on('collectLiveData') +def collectLiveData(): + global thread + with thread_lock: + if thread is None: + thread_event.set() + thread = socketio.start_background_task(background_thread, thread_event) + +def background_thread(event): + """Example of how to send server generated events to clients.""" + global thread + count = 0 + try: + while event.is_set(): + socketio.sleep(1) + count += 1 + socketio.emit('my_response', {'count': count}) + finally: + event.clear() + thread = None + +@socketio.on("stopCollectingLiveData") +def stopCollectingLiveData(): + global thread + thread_event.clear() + with thread_lock: + if thread is not None: + thread.join() + thread = None \ No newline at end of file diff --git a/src/nbi/tests/PrepareTestScenario.py b/src/nbi/tests/PrepareTestScenario.py index a574f086b30cf0125d82f1b41a986e3dc0fd2366..c0c4d41c81a4803faebdf43f6c7f6b08d5fe876a 100644 --- a/src/nbi/tests/PrepareTestScenario.py +++ b/src/nbi/tests/PrepareTestScenario.py @@ -20,21 +20,26 @@ from common.Settings import ( get_env_var_name, get_service_baseurl_http, get_service_port_http ) from context.client.ContextClient import ContextClient -from nbi.service.rest_server.RestServer import RestServer +from nbi.service.NbiApplication import NbiApplication +from nbi.service.rest_server.nbi_plugins import register_restconf from nbi.service.rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api from nbi.service.rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn from nbi.service.rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn from nbi.service.rest_server.nbi_plugins.ietf_network import register_ietf_network from nbi.service.rest_server.nbi_plugins.tfs_api import register_tfs_api +from nbi.service.websocket_namespaces.hearthbeat import register_heartbeat from nbi.tests.MockService_Dependencies import MockService_Dependencies from service.client.ServiceClient import ServiceClient from slice.client.SliceClient import SliceClient from tests.tools.mock_osm.MockOSM import MockOSM from .Constants import USERNAME, PASSWORD, WIM_MAPPING + LOCAL_HOST = '127.0.0.1' MOCKSERVICE_PORT = 10000 NBI_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_http(ServiceNameEnum.NBI) # avoid privileged ports +NBI_SERVICE_BASE_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or '' + os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(NBI_SERVICE_PORT) @@ -48,12 +53,22 @@ def mock_service(): @pytest.fixture(scope='session') def nbi_service_rest(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name, unused-argument + _nbi_app = NbiApplication(base_url=NBI_SERVICE_BASE_URL) + register_etsi_bwm_api (_nbi_app) + #register_ietf_hardware(_nbi_app) + register_ietf_l2vpn (_nbi_app) + register_ietf_l3vpn (_nbi_app) + register_ietf_network (_nbi_app) + #register_ietf_nss (_nbi_app) + #register_ietf_acl (_nbi_app) + #register_qkd_app (_nbi_app) + register_tfs_api (_nbi_app) + register_restconf (_nbi_app) + register_heartbeat (_nbi_app) + _nbi_app.dump_configuration() + + _rest_server = RestServer() - register_etsi_bwm_api(_rest_server) - register_ietf_l2vpn(_rest_server) - register_ietf_l3vpn(_rest_server) - register_ietf_network(_rest_server) - register_tfs_api(_rest_server) _rest_server.start() time.sleep(1) # bring time for the server to start yield _rest_server