diff --git a/src/nbi/service/vntm_recommend/Namespaces.py b/src/nbi/service/vntm_recommend/Namespaces.py index c4cb211a6ba3b8bbf2305f32f40443f071afcb61..a7660f85912391f2ea8984971254cd819b6892da 100644 --- a/src/nbi/service/vntm_recommend/Namespaces.py +++ b/src/nbi/service/vntm_recommend/Namespaces.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +import json, logging from flask import request from flask_socketio import Namespace, join_room, leave_room +from kafka import KafkaProducer +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from .Constants import SIO_NAMESPACE, SIO_ROOM from .VntRecommThread import VntRecommThread @@ -26,6 +28,10 @@ class VntRecommServerNamespace(Namespace): self._thread = VntRecommThread(self) self._thread.start() + self.kafka_producer = KafkaProducer( + bootstrap_servers = KafkaConfig.get_kafka_address(), + ) + def stop_thread(self) -> None: self._thread.stop() @@ -38,3 +44,31 @@ class VntRecommServerNamespace(Namespace): MSG = '[on_disconnect] Client disconnect: sid={:s}, reason={:s}' LOGGER.info(MSG.format(str(request.sid), str(reason))) leave_room(SIO_ROOM, namespace=SIO_NAMESPACE) + + def on_vlink_created(self, data): + MSG = '[on_vlink_created] begin: sid={:s}, data={:s}' + LOGGER.info(MSG.format(str(request.sid), str(data))) + + data = json.loads(data) + request_key = data.pop('_request_key') + + vntm_reply = json.dumps({'event': 'vlink_created', 'data': data}).encode('utf-8') + LOGGER.info('[on_vlink_created] vntm_reply={:s}'.format(str(vntm_reply))) + self.kafka_producer.send( + KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply + ) + self.kafka_producer.flush() + + def on_vlink_removed(self, data): + MSG = '[on_vlink_removed] begin: sid={:s}, data={:s}' + LOGGER.info(MSG.format(str(request.sid), str(data))) + + data = json.loads(data) + request_key = data.pop('_request_key') + + vntm_reply = json.dumps({'event': 'vlink_removed', 'data': data}).encode('utf-8') + LOGGER.info('[on_vlink_removed] vntm_reply={:s}'.format(str(vntm_reply))) + self.kafka_producer.send( + KafkaTopic.VNTMANAGER_RESPONSE.value, key=request_key, value=vntm_reply + ) + self.kafka_producer.flush() diff --git a/src/nbi/service/vntm_recommend/VntRecommThread.py b/src/nbi/service/vntm_recommend/VntRecommThread.py index e44670607fa26e42f2d3e461310f21b3ef083321..2b745c16de984ee9fa15e439eb67a3b81929c914 100644 --- a/src/nbi/service/vntm_recommend/VntRecommThread.py +++ b/src/nbi/service/vntm_recommend/VntRecommThread.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, socketio, threading +import json, logging, socketio, threading from typing import Dict, List from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from kafka import KafkaConsumer, TopicPartition @@ -71,13 +71,18 @@ class VntRecommThread(threading.Thread): def emit_recommendation(self, record : ConsumerRecord) -> None: message_key = record.key.decode('utf-8') message_value = record.value.decode('utf-8') + message_value = json.loads(message_value) + message_event = message_value.pop('event') + message_data = json.loads(message_value['data']) + message_data['_request_key'] = message_key + message_data = json.dumps(message_data) - MSG = '[emit_recommendation] Recommendation: key={:s} value={:s}' - LOGGER.debug(MSG.format(str(message_key), str(message_value))) + MSG = '[emit_recommendation] Recommendation: event={:s} data={:s}' + LOGGER.debug(MSG.format(str(message_event), str(message_data))) LOGGER.debug('[emit_recommendation] checking server namespace...') server : socketio.Server = self._namespace.server if server is None: return LOGGER.debug('[emit_recommendation] emitting recommendation...') - server.emit('recommendation', message_value, namespace=SIO_NAMESPACE, to=SIO_ROOM) + server.emit(message_event, message_data, namespace=SIO_NAMESPACE, to=SIO_ROOM) LOGGER.debug('[emit_recommendation] emitted')