Skip to content
Snippets Groups Projects
Commit 312f7036 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

NBI component:

- Removed old and unused context subscription module
- Grouped log-level adaptations in app
- Configured KafkaManager to enable SocketIO Servers in different gunicorn workers to self-coordinate
parent fc57c66d
No related branches found
No related tags found
3 merge requests!359Release TeraFlowSDN 5.0,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
...@@ -18,6 +18,7 @@ from typing import Any, List, Optional, Tuple ...@@ -18,6 +18,7 @@ from typing import Any, List, Optional, Tuple
from flask import Flask, request from flask import Flask, request
from flask_restful import Api, Resource from flask_restful import Api, Resource
from flask_socketio import Namespace, SocketIO from flask_socketio import Namespace, SocketIO
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from nbi.Config import SECRET_KEY from nbi.Config import SECRET_KEY
...@@ -40,10 +41,13 @@ class NbiApplication: ...@@ -40,10 +41,13 @@ class NbiApplication:
self._app.config['SECRET_KEY'] = SECRET_KEY self._app.config['SECRET_KEY'] = SECRET_KEY
self._app.after_request(log_request) self._app.after_request(log_request)
self._api = Api(self._app, prefix=base_url) self._api = Api(self._app, prefix=base_url)
#socketio_path = '/'.join([base_url.rstrip('/'), 'socket.io']) self._sio_client_manager = socketio.KafkaManager(
url='kafka://{:s}'.format(KafkaConfig.get_kafka_address()),
channel=KafkaTopic.NBI_SOCKETIO_WORKERS.value
)
self._sio = SocketIO( self._sio = SocketIO(
self._app, cors_allowed_origins='*', async_mode='eventlet', self._app, cors_allowed_origins='*', async_mode='eventlet',
#path=socketio_path, client_manager=self._sio_client_manager,
logger=True, engineio_logger=True logger=True, engineio_logger=True
) )
......
...@@ -50,6 +50,17 @@ logging.basicConfig( ...@@ -50,6 +50,17 @@ logging.basicConfig(
level=LOG_LEVEL, level=LOG_LEVEL,
format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s", format="[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s",
) )
logging.getLogger('kafka.client').setLevel(logging.WARNING)
logging.getLogger('kafka.cluster').setLevel(logging.WARNING)
logging.getLogger('kafka.conn').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING)
logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.kafka').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.record_accumulator').setLevel(logging.WARNING)
logging.getLogger('kafka.producer.sender').setLevel(logging.WARNING)
logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING)
logging.getLogger('socketio.server').setLevel(logging.WARNING) logging.getLogger('socketio.server').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from websockets.sync.server import serve
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.tools.object_factory.Topology import json_topology_id
from common.tools.object_factory.Context import json_context_id
from common.proto.context_pb2 import ContextId, TopologyId
import json
import os
from vnt_manager.client.VNTManagerClient import VNTManagerClient
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
vnt_manager_client: VNTManagerClient = VNTManagerClient()
context_client: ContextClient = ContextClient()
ALL_HOSTS = "0.0.0.0"
WS_IP_PORT = int(get_setting('WS_IP_PORT', default='8761'))
LOGGER = logging.getLogger(__name__)
def register_context_subscription():
with serve(subcript_to_vnt_manager, ALL_HOSTS, WS_IP_PORT, logger=LOGGER) as server:
LOGGER.info("Running subscription server...: {}:{}".format(ALL_HOSTS, str(WS_IP_PORT)))
server.serve_forever()
LOGGER.info("Exiting subscription server...")
def subcript_to_vnt_manager(websocket):
for message in websocket:
LOGGER.debug("Message received: {}".format(message))
message_json = json.loads(message)
request = VNTSubscriptionRequest()
request.host = message_json['host']
request.port = message_json['port']
LOGGER.debug("Received gRPC from ws: {}".format(request))
try:
vntm_reply = vnt_manager_client.VNTSubscript(request)
LOGGER.debug("Received gRPC from vntm: {}".format(vntm_reply))
except Exception as e:
LOGGER.error('Could not subscript to VTNManager: {}'.format(e))
websocket.send(vntm_reply.subscription)
...@@ -19,15 +19,6 @@ from kafka import KafkaConsumer, TopicPartition ...@@ -19,15 +19,6 @@ from kafka import KafkaConsumer, TopicPartition
from kafka.consumer.fetcher import ConsumerRecord from kafka.consumer.fetcher import ConsumerRecord
from .Constants import SIO_NAMESPACE, SIO_ROOM from .Constants import SIO_NAMESPACE, SIO_ROOM
logging.getLogger('kafka.client').setLevel(logging.WARNING)
logging.getLogger('kafka.cluster').setLevel(logging.WARNING)
logging.getLogger('kafka.conn').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.fetcher').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.group').setLevel(logging.WARNING)
logging.getLogger('kafka.consumer.subscription_state').setLevel(logging.WARNING)
logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING)
logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class VntRecommThread(threading.Thread): class VntRecommThread(threading.Thread):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment