diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index 2424f5530c40708c5f5dd5aa367a8012cc98c4ef..52ee03d2269600bfb6cc9864ea5e3f2c752f9597 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -39,15 +39,6 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') self.context_client = ContextClient() - self.kafka_producer = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address() - }) - self.kafka_consumer = KafkaConsumer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : str(uuid.uuid4()), - 'auto.offset.reset' : 'latest' - }) - self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value]) self.links = [] LOGGER.debug('Servicer Created') @@ -66,6 +57,9 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): vntm_request = json.dumps(vntm_request) MSG = '[send_recommendation] request_key={:s} vntm_request={:s}' LOGGER.info(MSG.format(str(request_key), str(vntm_request))) + self.kafka_producer = KafkaProducer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address() + }) self.kafka_producer.produce( KafkaTopic.VNTMANAGER_REQUEST.value, key=request_key.encode('utf-8'), @@ -87,6 +81,15 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): def wait_for_reply(self, request_key : str) -> Optional[Dict]: LOGGER.info('[wait_for_reply] request_key={:s}'.format(str(request_key))) + self.kafka_consumer = KafkaConsumer({ + 'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : str(uuid.uuid4()), + 'auto.offset.reset' : 'latest', + 'max.poll.interval.ms': 600000, + 'session.timeout.ms' : 60000, + }) + self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value]) + while True: receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: continue