From 9f585e31b720aec9eb7ed958a43583f8995296be Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Tue, 18 Mar 2025 18:55:12 +0000 Subject: [PATCH] VNT Manager component: - Corrected Kafka connection --- .../service/VNTManagerServiceServicerImpl.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index 2424f5530..52ee03d22 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -39,15 +39,6 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') self.context_client = ContextClient() - self.kafka_producer = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address() - }) - self.kafka_consumer = KafkaConsumer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : str(uuid.uuid4()), - 'auto.offset.reset' : 'latest' - }) - self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value]) self.links = [] LOGGER.debug('Servicer Created') @@ -66,6 +57,9 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): vntm_request = json.dumps(vntm_request) MSG = '[send_recommendation] request_key={:s} vntm_request={:s}' LOGGER.info(MSG.format(str(request_key), str(vntm_request))) + self.kafka_producer = KafkaProducer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address() + }) self.kafka_producer.produce( KafkaTopic.VNTMANAGER_REQUEST.value, key=request_key.encode('utf-8'), @@ -87,6 +81,15 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): def wait_for_reply(self, request_key : str) -> Optional[Dict]: LOGGER.info('[wait_for_reply] request_key={:s}'.format(str(request_key))) + self.kafka_consumer = KafkaConsumer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : str(uuid.uuid4()), + 'auto.offset.reset' : 'latest', + 'max.poll.interval.ms': 600000, + 'session.timeout.ms' : 60000, + }) + self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value]) + while True: receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: continue -- GitLab