diff --git a/src/nbi/service/NbiApplication.py b/src/nbi/service/NbiApplication.py index 2216177ff5859b006e62e588161dcb33b7f19c18..0ee7c58ef7766a2d7c3b3c117361f4aca678b86e 100644 --- a/src/nbi/service/NbiApplication.py +++ b/src/nbi/service/NbiApplication.py @@ -18,6 +18,7 @@ from typing import Any, List, Optional, Tuple from flask import Flask, request from flask_restful import Api, Resource from flask_socketio import Namespace, SocketIO +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from nbi.Config import SECRET_KEY @@ -40,10 +41,13 @@ class NbiApplication: self._app.config['SECRET_KEY'] = SECRET_KEY self._app.after_request(log_request) self._api = Api(self._app, prefix=base_url) - #socketio_path = '/'.join([base_url.rstrip('/'), 'socket.io']) + self._sio_client_manager = socketio.KafkaManager( + url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()), + channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value + ) self._sio = SocketIO( self._app, cors_allowed_origins='*', async_mode='eventlet', - #path=socketio_path, + client_manager=self._sio_client_manager, logger=True, engineio_logger=True ) diff --git a/src/nbi/service/app.py b/src/nbi/service/app.py index a0206cdd6df679a8e7860ea49b2a7330e7f1e8df..99f66a94cbe85983d6de9cb9247f0a551d5a8da3 100644 --- a/src/nbi/service/app.py +++ b/src/nbi/service/app.py @@ -50,6 +50,17 @@ logging.basicConfig( level=LOG_LEVEL, format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s", ) +logging.getLogger('kafka.client').setLevel(logging.WARNING) +logging.getLogger('kafka.cluster').setLevel(logging.WARNING) +logging.getLogger('kafka.conn').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) +logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) +logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.kafka').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.record_accumulator').setLevel(logging.WARNING) +logging.getLogger('kafka.producer.sender').setLevel(logging.WARNING) +logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) logging.getLogger('socketio.server').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) diff --git a/src/nbi/service/context_subscription/__init__.py b/src/nbi/service/context_subscription/__init__.py deleted file mode 100644 index 1e88a3cd10afd7ee50130bac1e0be42a795d16ac..0000000000000000000000000000000000000000 --- a/src/nbi/service/context_subscription/__init__.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from websockets.sync.server import serve -from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest -from common.Settings import get_setting -from context.client.ContextClient import ContextClient -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME -from common.tools.object_factory.Topology import json_topology_id -from common.tools.object_factory.Context import json_context_id -from common.proto.context_pb2 import ContextId, TopologyId -import json -import os -from vnt_manager.client.VNTManagerClient import VNTManagerClient - -JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) -ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) -ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID)) - -vnt_manager_client: VNTManagerClient = VNTManagerClient() -context_client: ContextClient = ContextClient() - -ALL_HOSTS = "0.0.0.0" -WS_IP_PORT = int(get_setting('WS_IP_PORT', default='8761')) - -LOGGER = logging.getLogger(__name__) - - -def register_context_subscription(): - with serve(subcript_to_vnt_manager, ALL_HOSTS, WS_IP_PORT, logger=LOGGER) as server: - LOGGER.info("Running subscription server...: {}:{}".format(ALL_HOSTS, str(WS_IP_PORT))) - server.serve_forever() - LOGGER.info("Exiting subscription server...") - - -def subcript_to_vnt_manager(websocket): - for message in websocket: - LOGGER.debug("Message received: {}".format(message)) - message_json = json.loads(message) - request = VNTSubscriptionRequest() - request.host = message_json['host'] - request.port = message_json['port'] - LOGGER.debug("Received gRPC from ws: {}".format(request)) - - try: - vntm_reply = vnt_manager_client.VNTSubscript(request) - LOGGER.debug("Received gRPC from vntm: {}".format(vntm_reply)) - except Exception as e: - LOGGER.error('Could not subscript to VTNManager: {}'.format(e)) - - websocket.send(vntm_reply.subscription) diff --git a/src/nbi/service/vntm_recommend/VntRecommThread.py b/src/nbi/service/vntm_recommend/VntRecommThread.py index bd9a452f3e0538e60f945d8e9da54b834b577f93..f3d74e5ac6eb47520590f9caf6219c3a5e0e38a5 100644 --- a/src/nbi/service/vntm_recommend/VntRecommThread.py +++ b/src/nbi/service/vntm_recommend/VntRecommThread.py @@ -19,15 +19,6 @@ from kafka import KafkaConsumer, TopicPartition from kafka.consumer.fetcher import ConsumerRecord from .Constants import SIO_NAMESPACE, SIO_ROOM -logging.getLogger('kafka.client').setLevel(logging.WARNING) -logging.getLogger('kafka.cluster').setLevel(logging.WARNING) -logging.getLogger('kafka.conn').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING) -logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING) -logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING) -logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING) - LOGGER = logging.getLogger(__name__) class VntRecommThread(threading.Thread):