Skip to content
Snippets Groups Projects
Commit 9f585e31 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

VNT Manager component:

- Corrected Kafka connection
parent e62ec3fd
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
...@@ -39,15 +39,6 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -39,15 +39,6 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self.context_client = ContextClient() self.context_client = ContextClient()
self.kafka_producer = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_kafka_address()
})
self.kafka_consumer = KafkaConsumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : str(uuid.uuid4()),
'auto.offset.reset' : 'latest'
})
self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value])
self.links = [] self.links = []
LOGGER.debug('Servicer Created') LOGGER.debug('Servicer Created')
...@@ -66,6 +57,9 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -66,6 +57,9 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
vntm_request = json.dumps(vntm_request) vntm_request = json.dumps(vntm_request)
MSG = '[send_recommendation] request_key={:s} vntm_request={:s}' MSG = '[send_recommendation] request_key={:s} vntm_request={:s}'
LOGGER.info(MSG.format(str(request_key), str(vntm_request))) LOGGER.info(MSG.format(str(request_key), str(vntm_request)))
self.kafka_producer = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_kafka_address()
})
self.kafka_producer.produce( self.kafka_producer.produce(
KafkaTopic.VNTMANAGER_REQUEST.value, KafkaTopic.VNTMANAGER_REQUEST.value,
key=request_key.encode('utf-8'), key=request_key.encode('utf-8'),
...@@ -87,6 +81,15 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -87,6 +81,15 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
def wait_for_reply(self, request_key : str) -> Optional[Dict]: def wait_for_reply(self, request_key : str) -> Optional[Dict]:
LOGGER.info('[wait_for_reply] request_key={:s}'.format(str(request_key))) LOGGER.info('[wait_for_reply] request_key={:s}'.format(str(request_key)))
self.kafka_consumer = KafkaConsumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : str(uuid.uuid4()),
'auto.offset.reset' : 'latest',
'max.poll.interval.ms': 600000,
'session.timeout.ms' : 60000,
})
self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value])
while True: while True:
receive_msg = self.kafka_consumer.poll(2.0) receive_msg = self.kafka_consumer.poll(2.0)
if receive_msg is None: continue if receive_msg is None: continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment