From 312f70364b9298636fd6f34e1405c08abc5fcb2a Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Sun, 16 Mar 2025 12:31:56 +0000 Subject: [PATCH] NBI component: - Removed old and unused context subscription module - Grouped log-level adaptations in app - Configured KafkaManager to enable SocketIO Servers in different gunicorn workers to self-coordinate --- src/nbi/service/NbiApplication.py | 8 ++- src/nbi/service/app.py | 11 ++++ .../service/context_subscription/__init__.py | 64 ------------------- .../service/vntm_recommend/VntRecommThread.py | 9 --- 4 files changed, 17 insertions(+), 75 deletions(-) delete mode 100644 src/nbi/service/context_subscription/__init__.py diff --git a/src/nbi/service/NbiApplication.py b/src/nbi/service/NbiApplication.py index 2216177ff..0ee7c58ef 100644 --- a/src/nbi/service/NbiApplication.py +++ b/src/nbi/service/NbiApplication.py @@ -18,6 +18,7 @@ from typing import Any, List, Optional, Tuple from flask import Flask, request from flask_restful import Api, Resource from flask_socketio import Namespace, SocketIO +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from nbi.Config import SECRET_KEY @@ -40,10 +41,13 @@ class NbiApplication: self._app.config['SECRET_KEY'] = SECRET_KEY self._app.after_request(log_request) self._api = Api(self._app, prefix=base_url) - #socketio_path = '/'.join([base_url.rstrip('/'), 'socket.io']) + self._sio_client_manager = socketio.KafkaManager( + url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()), + channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value + ) self._sio = SocketIO( self._app, cors_allowed_origins='*', async_mode='eventlet', - #path=socketio_path, + client_manager=self._sio_client_manager, logger=True, engineio_logger=True ) diff --git a/src/nbi/service/app.py b/src/nbi/service/app.py index a0206cdd6..99f66a94c 100644 --- a/src/nbi/service/app.py +++ b/src/nbi/service/app.py @@ -50,6 +50,17 @@ logging.basicConfig( level=LOG_LEVEL, format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s", ) +logging.getLogger('kafka.client').setLevel(logging.WARNING) +logging.getLogger('kafka.cluster').setLevel(logging.WARNING) +logging.getLogger('kafka.conn').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) +logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.kafka').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.record_accumulator').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.sender').setLevel(logging.WARNING) +logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) logging.getLogger('socketio.server').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) diff --git a/src/nbi/service/context_subscription/__init__.py b/src/nbi/service/context_subscription/__init__.py deleted file mode 100644 index 1e88a3cd1..000000000 --- a/src/nbi/service/context_subscription/__init__.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from websockets.sync.server import serve -from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest -from common.Settings import get_setting -from context.client.ContextClient import ContextClient -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME -from common.tools.object_factory.Topology import json_topology_id -from common.tools.object_factory.Context import json_context_id -from common.proto.context_pb2 import ContextId, TopologyId -import json -import os -from vnt_manager.client.VNTManagerClient import VNTManagerClient - -JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) -ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) -ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID)) - -vnt_manager_client: VNTManagerClient = VNTManagerClient() -context_client: ContextClient = ContextClient() - -ALL_HOSTS = "0.0.0.0" -WS_IP_PORT = int(get_setting('WS_IP_PORT', default='8761')) - -LOGGER = logging.getLogger(__name__) - - -def register_context_subscription(): - with serve(subcript_to_vnt_manager, ALL_HOSTS, WS_IP_PORT, logger=LOGGER) as server: - LOGGER.info("Running subscription server...: {}:{}".format(ALL_HOSTS, str(WS_IP_PORT))) - server.serve_forever() - LOGGER.info("Exiting subscription server...") - - -def subcript_to_vnt_manager(websocket): - for message in websocket: - LOGGER.debug("Message received: {}".format(message)) - message_json = json.loads(message) - request = VNTSubscriptionRequest() - request.host = message_json['host'] - request.port = message_json['port'] - LOGGER.debug("Received gRPC from ws: {}".format(request)) - - try: - vntm_reply = vnt_manager_client.VNTSubscript(request) - LOGGER.debug("Received gRPC from vntm: {}".format(vntm_reply)) - except Exception as e: - LOGGER.error('Could not subscript to VTNManager: {}'.format(e)) - - websocket.send(vntm_reply.subscription) diff --git a/src/nbi/service/vntm_recommend/VntRecommThread.py b/src/nbi/service/vntm_recommend/VntRecommThread.py index bd9a452f3..f3d74e5ac 100644 --- a/src/nbi/service/vntm_recommend/VntRecommThread.py +++ b/src/nbi/service/vntm_recommend/VntRecommThread.py @@ -19,15 +19,6 @@ from kafka import KafkaConsumer, TopicPartition from kafka.consumer.fetcher import ConsumerRecord from .Constants import SIO_NAMESPACE, SIO_ROOM -logging.getLogger('kafka.client').setLevel(logging.WARNING) -logging.getLogger('kafka.cluster').setLevel(logging.WARNING) -logging.getLogger('kafka.conn').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) -logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) -logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) - LOGGER = logging.getLogger(__name__) class VntRecommThread(threading.Thread): -- GitLab