From 80fdd42004c28d201d14c5884757e97bf2a21848 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Thu, 6 Feb 2025 12:14:27 +0000 Subject: [PATCH] NBI Component: - Fixed Heartbeat Thread - Added termination conditions to Heartbeat Namespace - Fixed issue reporting configured namespaces - Separated tests/MockService to run in a completely isolated process to prevent collisions with eventlet - Fixed SECRET_KEY for Flask --- src/nbi/Config.py | 3 + src/nbi/service/NbiApplication.py | 17 +++-- .../service/health_probes/HeartbeatThread.py | 22 ++++-- src/nbi/service/health_probes/Namespaces.py | 3 + src/nbi/tests/MockService_Dependencies.py | 50 +++++++----- src/nbi/tests/PrepareTestScenario.py | 76 ++++++++++++++----- src/nbi/tests/test_core.py | 2 +- 7 files changed, 126 insertions(+), 47 deletions(-) diff --git a/src/nbi/Config.py b/src/nbi/Config.py index 83a350058..7fffa2428 100644 --- a/src/nbi/Config.py +++ b/src/nbi/Config.py @@ -18,3 +18,6 @@ from werkzeug.security import generate_password_hash RESTAPI_USERS = { # TODO: implement a database of credentials and permissions 'admin': generate_password_hash('admin'), } + +# Rebuild using: "python -c 'import secrets; print(secrets.token_hex())'" +SECRET_KEY = '2b8ab76763d81f7bced786de8ba40bd67eea6ff79217a711eb5f8d1f19c145c1' diff --git a/src/nbi/service/NbiApplication.py b/src/nbi/service/NbiApplication.py index ef2659415..2216177ff 100644 --- a/src/nbi/service/NbiApplication.py +++ b/src/nbi/service/NbiApplication.py @@ -13,11 +13,12 @@ # limitations under the License. -import logging, time +import logging, socketio, time from typing import Any, List, Optional, Tuple from flask import Flask, request from flask_restful import Api, Resource from flask_socketio import Namespace, SocketIO +from nbi.Config import SECRET_KEY LOGGER = logging.getLogger(__name__) @@ -36,7 +37,7 @@ class NbiApplication: self.base_url = base_url self._app = Flask(__name__) - self._app.config['SECRET_KEY'] = 'secret!' + self._app.config['SECRET_KEY'] = SECRET_KEY self._app.after_request(log_request) self._api = Api(self._app, prefix=base_url) #socketio_path = '/'.join([base_url.rstrip('/'), 'socket.io']) @@ -64,6 +65,12 @@ class NbiApplication: def get_flask_app(self) -> Flask: return self._app + def get_flask_api(self) -> Api: + return self._api + + def get_socketio_server(self) -> Optional[socketio.Server]: + return self._sio.server + def dump_configuration(self) -> None: LOGGER.debug('Configured REST-API Resources:') for resource in self._api.resources: @@ -73,14 +80,12 @@ class NbiApplication: for rule in self._app.url_map.iter_rules(): LOGGER.debug(' - {:s}'.format(str(rule))) - # TODO: find a way to report configured namespaces, for some reason, - # those data structures become emptied when SocketIO server starts. LOGGER.debug('Configured SocketIO/WebSocket Namespaces:') - LOGGER.debug(' WARNING: Might report an empty list of namespaces even when') - LOGGER.debug(' they are properly configured. To be fixed.') for handler in self._sio.handlers: LOGGER.debug(' - {:s}'.format(str(handler))) for namespace in self._sio.namespace_handlers: LOGGER.debug(' - {:s}'.format(str(namespace))) for namespace in self._sio.server.handlers: LOGGER.debug(' - {:s}'.format(str(namespace))) + for namespace in self._sio.server.namespace_handlers: + LOGGER.debug(' - {:s}'.format(str(namespace))) diff --git a/src/nbi/service/health_probes/HeartbeatThread.py b/src/nbi/service/health_probes/HeartbeatThread.py index 0b7158a09..c49f4ab49 100644 --- a/src/nbi/service/health_probes/HeartbeatThread.py +++ b/src/nbi/service/health_probes/HeartbeatThread.py @@ -20,21 +20,31 @@ LOGGER = logging.getLogger(__name__) class HeartbeatThread(threading.Thread): def __init__(self, namespace : socketio.Namespace): super().__init__(daemon=True) + self._terminate = threading.Event() self._namespace = namespace + def start(self): + self._terminate.clear() + return super().start() + + def stop(self) -> None: + self._terminate.set() + def run(self): + LOGGER.debug('[HeartbeatThread::run] begin') try: - while True: + while not self._terminate.is_set(): + LOGGER.debug('[HeartbeatThread::run] Running...') time.sleep(HEARTHBEAT_INTERVAL) - uptime = time.time() - START_TIME - LOGGER.info('[HeartbeatThread::run] emitting...') + LOGGER.debug('[HeartbeatThread::run] Interval elapsed') server : socketio.Server = self._namespace.server if server is None: continue - data = {'uptime_seconds': uptime} + LOGGER.debug('[HeartbeatThread::run] emitting...') + data = {'uptime_seconds': time.time() - START_TIME} server.emit('uptime', data, namespace=SIO_NAMESPACE, to=SIO_ROOM) - - LOGGER.info('[HeartbeatThread::run] emitted') + LOGGER.debug('[HeartbeatThread::run] emitted') except: # pylint: disable=bare-except LOGGER.exception('[HeartbeatThread::run] thread failed') + LOGGER.debug('[HeartbeatThread::run] end') diff --git a/src/nbi/service/health_probes/Namespaces.py b/src/nbi/service/health_probes/Namespaces.py index 9fba4ba71..9f7517d9b 100644 --- a/src/nbi/service/health_probes/Namespaces.py +++ b/src/nbi/service/health_probes/Namespaces.py @@ -26,6 +26,9 @@ class HeartbeatServerNamespace(Namespace): self._thread = HeartbeatThread(self) self._thread.start() + def stop_thread(self) -> None: + self._thread.stop() + def on_connect(self, auth): MSG = '[HeartbeatServerNamespace::on_connect] Client connect: sid={:s}, auth={:s}' LOGGER.info(MSG.format(str(request.sid), str(auth))) diff --git a/src/nbi/tests/MockService_Dependencies.py b/src/nbi/tests/MockService_Dependencies.py index 6d8ad7e83..8535fd56d 100644 --- a/src/nbi/tests/MockService_Dependencies.py +++ b/src/nbi/tests/MockService_Dependencies.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -from typing import Union -from common.Constants import ServiceNameEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name +import logging, signal, sys, threading from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from common.proto.service_pb2_grpc import add_ServiceServiceServicer_to_server from common.proto.slice_pb2_grpc import add_SliceServiceServicer_to_server @@ -23,18 +20,21 @@ from common.tests.MockServicerImpl_Context import MockServicerImpl_Context from common.tests.MockServicerImpl_Service import MockServicerImpl_Service from common.tests.MockServicerImpl_Slice import MockServicerImpl_Slice from common.tools.service.GenericGrpcService import GenericGrpcService -from .Constants import LOCAL_HOST +from .Constants import LOCAL_HOST, MOCKSERVICE_PORT -SERVICE_CONTEXT = ServiceNameEnum.CONTEXT -SERVICE_SERVICE = ServiceNameEnum.SERVICE -SERVICE_SLICE = ServiceNameEnum.SLICE +logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") +LOGGER = logging.getLogger(__name__) class MockService_Dependencies(GenericGrpcService): # Mock Service implementing Context, Service and Slice to simplify unitary tests of NBI - def __init__(self, bind_port: Union[str, int]) -> None: - super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + def __init__(self) -> None: + super().__init__( + MOCKSERVICE_PORT, LOCAL_HOST, + enable_health_servicer=False, + cls_name='MockService' + ) # pylint: disable=attribute-defined-outside-init def install_servicers(self): @@ -47,12 +47,28 @@ class MockService_Dependencies(GenericGrpcService): self.slice_servicer = MockServicerImpl_Slice() add_SliceServiceServicer_to_server(self.slice_servicer, self.server) - def configure_env_vars(self): - os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) - os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) +TERMINATE = threading.Event() - os.environ[get_env_var_name(SERVICE_SERVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) - os.environ[get_env_var_name(SERVICE_SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.warning('Terminate signal received') + TERMINATE.set() - os.environ[get_env_var_name(SERVICE_SLICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) - os.environ[get_env_var_name(SERVICE_SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) +def main(): + LOGGER.info('Starting...') + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + grpc_service = MockService_Dependencies() + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not TERMINATE.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/nbi/tests/PrepareTestScenario.py b/src/nbi/tests/PrepareTestScenario.py index dd09db1d8..8852aa2c4 100644 --- a/src/nbi/tests/PrepareTestScenario.py +++ b/src/nbi/tests/PrepareTestScenario.py @@ -12,46 +12,88 @@ # See the License for the specific language governing permissions and # limitations under the License. -import enum, logging, os, pytest, requests, time +import enum, logging, os, pytest, requests, subprocess, time from typing import Any, Dict, List, Optional, Set, Union +from socketio import Namespace from common.Constants import ServiceNameEnum from common.Settings import ( - ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_HTTP, - get_env_var_name + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, + ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name ) from context.client.ContextClient import ContextClient from nbi.service.NbiApplication import NbiApplication -from nbi.tests.MockService_Dependencies import MockService_Dependencies +from nbi.service.health_probes.Constants import SIO_NAMESPACE as HEARTBEAT_NAMESPACE +from nbi.service.health_probes.Namespaces import HeartbeatServerNamespace from service.client.ServiceClient import ServiceClient from slice.client.SliceClient import SliceClient from tests.tools.mock_osm.MockOSM import MockOSM from .Constants import ( - LOCAL_HOST, MOCKSERVICE_PORT, NBI_SERVICE_BASE_URL, NBI_SERVICE_PORT, USERNAME, PASSWORD + LOCAL_HOST, MOCKSERVICE_PORT, NBI_SERVICE_BASE_URL, NBI_SERVICE_PORT, + USERNAME, PASSWORD ) from .OSM_Constants import WIM_MAPPING from .MockWebServer import MockWebServer -os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(NBI_SERVICE_PORT) +os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.NBI, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(NBI_SERVICE_PORT) +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MOCKSERVICE_PORT) +os.environ[get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MOCKSERVICE_PORT) +os.environ[get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MOCKSERVICE_PORT) + @pytest.fixture(scope='session') def mock_service(): - _service = MockService_Dependencies(MOCKSERVICE_PORT) - _service.configure_env_vars() - _service.start() - yield _service - _service.stop() + # NOTE: Starting MockServer in a separate process to prevent + # issues with eventlet monkey-patched libraries. + + cmd = ['python', '-m', 'nbi.tests.MockService_Dependencies'] + custom_env = os.environ.copy() + mock_service_process = subprocess.Popen( + cmd, + env=custom_env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.DEVNULL, + text=True + ) + + yield True + + # Check if process is still running + if mock_service_process.poll() is None: + mock_service_process.terminate() # Try to terminate gracefully + time.sleep(2) # Give it time to exit + if mock_service_process.poll() is None: + mock_service_process.kill() # Force kill if still running + + # Capture final output after process terminates + stdout, stderr = mock_service_process.communicate() + LOGGER = logging.getLogger('MockService_Dependencies') + if stdout: LOGGER.info('STDOUT:\n{:s}'.format(str(stdout.strip()))) + if stderr: LOGGER.error('STDERR:\n{:s}'.format(str(stderr.strip()))) + LOGGER.info('Terminated') @pytest.fixture(scope='session') def nbi_application( - mock_service : MockService_Dependencies # pylint: disable=redefined-outer-name, unused-argument + mock_service # pylint: disable=redefined-outer-name, unused-argument ) -> NbiApplication: mock_web_server = MockWebServer() mock_web_server.start() time.sleep(1) # bring time for the server to start - yield mock_web_server.nbi_app + + nbi_app = mock_web_server.nbi_app + yield nbi_app + + sio_server = nbi_app.get_socketio_server() + sio_namespaces : Dict[str, Namespace] = sio_server.namespace_handlers + heartbeat_namespace : HeartbeatServerNamespace = sio_namespaces.get(HEARTBEAT_NAMESPACE) + heartbeat_namespace.stop_thread() + mock_web_server.join(timeout=1) @@ -64,7 +106,7 @@ def osm_wim( @pytest.fixture(scope='session') def context_client( - mock_service : MockService_Dependencies # pylint: disable=redefined-outer-name, unused-argument + mock_service # pylint: disable=redefined-outer-name, unused-argument ) -> ContextClient: _client = ContextClient() yield _client @@ -72,7 +114,7 @@ def context_client( @pytest.fixture(scope='session') def service_client( - mock_service : MockService_Dependencies # pylint: disable=redefined-outer-name, unused-argument + mock_service # pylint: disable=redefined-outer-name, unused-argument ) -> ServiceClient: _client = ServiceClient() yield _client @@ -80,7 +122,7 @@ def service_client( @pytest.fixture(scope='session') def slice_client( - mock_service : MockService_Dependencies # pylint: disable=redefined-outer-name, unused-argument + mock_service # pylint: disable=redefined-outer-name, unused-argument ) -> SliceClient: _client = SliceClient() yield _client diff --git a/src/nbi/tests/test_core.py b/src/nbi/tests/test_core.py index 80a3d4b10..f41c26f5d 100644 --- a/src/nbi/tests/test_core.py +++ b/src/nbi/tests/test_core.py @@ -47,7 +47,7 @@ def test_websocket_get_heartbeat( heartbeat_client_namespace = HeartbeatClientNamespace() - sio = socketio.Client(logger=True) + sio = socketio.Client(logger=True, engineio_logger=True) sio.register_namespace(heartbeat_client_namespace) sio.connect(NBI_SERVICE_BASE_URL) #sio.send('Hello WebSocket!', namespace='/heartbeat') -- GitLab