Loading src/nbi/service/vntm_recommend/Namespaces.py +35 −1 Original line number Diff line number Diff line Loading @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging import json, logging from flask import request from flask_socketio import Namespace, join_room, leave_room from kafka import KafkaProducer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from .Constants import SIO_NAMESPACE, SIO_ROOM from .VntRecommThread import VntRecommThread Loading @@ -26,6 +28,10 @@ class VntRecommServerNamespace(Namespace): self._thread = VntRecommThread(self) self._thread.start() self.kafka_producer = KafkaProducer( bootstrap_servers = KafkaConfig.get_kafka_address(), ) def stop_thread(self) -> None: self._thread.stop() Loading @@ -38,3 +44,31 @@ class VntRecommServerNamespace(Namespace): MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' LOGGER.info(MSG.format(str(request.sid), str(reason))) leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) def on_vlink_created(self, data): MSG = '[on_vlink_created] begin: sid={:s}, data={:s}' LOGGER.info(MSG.format(str(request.sid), str(data))) data = json.loads(data) request_key = data.pop('_request_key') vntm_reply = json.dumps({'event': 'vlink_created', 'data': data}).encode('utf-8') LOGGER.info('[on_vlink_created] vntm_reply={:s}'.format(str(vntm_reply))) self.kafka_producer.send( KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply ) self.kafka_producer.flush() def on_vlink_removed(self, data): MSG = '[on_vlink_removed] begin: sid={:s}, data={:s}' LOGGER.info(MSG.format(str(request.sid), str(data))) data = json.loads(data) request_key = data.pop('_request_key') vntm_reply = json.dumps({'event': 'vlink_removed', 'data': data}).encode('utf-8') LOGGER.info('[on_vlink_removed] vntm_reply={:s}'.format(str(vntm_reply))) self.kafka_producer.send( KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply ) self.kafka_producer.flush() src/nbi/service/vntm_recommend/VntRecommThread.py +9 −4 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, socketio, threading import json, logging, socketio, threading from typing import Dict, List from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from kafka import KafkaConsumer, TopicPartition Loading Loading @@ -71,13 +71,18 @@ class VntRecommThread(threading.Thread): def emit_recommendation(self, record : ConsumerRecord) -> None: message_key = record.key.decode('utf-8') message_value = record.value.decode('utf-8') message_value = json.loads(message_value) message_event = message_value.pop('event') message_data = json.loads(message_value['data']) message_data['_request_key'] = message_key message_data = json.dumps(message_data) MSG = '[emit_recommendation] Recommendation: key={:s} value={:s}' LOGGER.debug(MSG.format(str(message_key), str(message_value))) MSG = '[emit_recommendation] Recommendation: event={:s} data={:s}' LOGGER.debug(MSG.format(str(message_event), str(message_data))) LOGGER.debug('[emit_recommendation] checking server namespace...') server : socketio.Server = self._namespace.server if server is None: return LOGGER.debug('[emit_recommendation] emitting recommendation...') server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM) server.emit(message_event, message_data, namespace=SIO_NAMESPACE, to=SIO_ROOM) LOGGER.debug('[emit_recommendation] emitted') Loading
src/nbi/service/vntm_recommend/Namespaces.py +35 −1 Original line number Diff line number Diff line Loading @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging import json, logging from flask import request from flask_socketio import Namespace, join_room, leave_room from kafka import KafkaProducer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from .Constants import SIO_NAMESPACE, SIO_ROOM from .VntRecommThread import VntRecommThread Loading @@ -26,6 +28,10 @@ class VntRecommServerNamespace(Namespace): self._thread = VntRecommThread(self) self._thread.start() self.kafka_producer = KafkaProducer( bootstrap_servers = KafkaConfig.get_kafka_address(), ) def stop_thread(self) -> None: self._thread.stop() Loading @@ -38,3 +44,31 @@ class VntRecommServerNamespace(Namespace): MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' LOGGER.info(MSG.format(str(request.sid), str(reason))) leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) def on_vlink_created(self, data): MSG = '[on_vlink_created] begin: sid={:s}, data={:s}' LOGGER.info(MSG.format(str(request.sid), str(data))) data = json.loads(data) request_key = data.pop('_request_key') vntm_reply = json.dumps({'event': 'vlink_created', 'data': data}).encode('utf-8') LOGGER.info('[on_vlink_created] vntm_reply={:s}'.format(str(vntm_reply))) self.kafka_producer.send( KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply ) self.kafka_producer.flush() def on_vlink_removed(self, data): MSG = '[on_vlink_removed] begin: sid={:s}, data={:s}' LOGGER.info(MSG.format(str(request.sid), str(data))) data = json.loads(data) request_key = data.pop('_request_key') vntm_reply = json.dumps({'event': 'vlink_removed', 'data': data}).encode('utf-8') LOGGER.info('[on_vlink_removed] vntm_reply={:s}'.format(str(vntm_reply))) self.kafka_producer.send( KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply ) self.kafka_producer.flush()
src/nbi/service/vntm_recommend/VntRecommThread.py +9 −4 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, socketio, threading import json, logging, socketio, threading from typing import Dict, List from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from kafka import KafkaConsumer, TopicPartition Loading Loading @@ -71,13 +71,18 @@ class VntRecommThread(threading.Thread): def emit_recommendation(self, record : ConsumerRecord) -> None: message_key = record.key.decode('utf-8') message_value = record.value.decode('utf-8') message_value = json.loads(message_value) message_event = message_value.pop('event') message_data = json.loads(message_value['data']) message_data['_request_key'] = message_key message_data = json.dumps(message_data) MSG = '[emit_recommendation] Recommendation: key={:s} value={:s}' LOGGER.debug(MSG.format(str(message_key), str(message_value))) MSG = '[emit_recommendation] Recommendation: event={:s} data={:s}' LOGGER.debug(MSG.format(str(message_event), str(message_data))) LOGGER.debug('[emit_recommendation] checking server namespace...') server : socketio.Server = self._namespace.server if server is None: return LOGGER.debug('[emit_recommendation] emitting recommendation...') server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM) server.emit(message_event, message_data, namespace=SIO_NAMESPACE, to=SIO_ROOM) LOGGER.debug('[emit_recommendation] emitted')