Skip to content
Snippets Groups Projects
Commit e418d086 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to resolve Kafka server error

- KFK_SERVER_ADDRESS_TEMPLATE now defined inside the class KafkaConfig.
- variable renamed to "SERVER_ADDRESS" from "server_address"
parent daadb074
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -20,14 +20,14 @@ from common.Settings import get_setting ...@@ -20,14 +20,14 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
class KafkaConfig(Enum): class KafkaConfig(Enum):
KFK_NAMESPACE = get_setting('KFK_NAMESPACE') KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KFK_PORT = get_setting('KFK_SERVER_PORT') KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
# SERVER_IP = "127.0.0.1:9092" KFK_PORT = get_setting('KFK_SERVER_PORT')
server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) # SERVER_ADDRESS = "127.0.0.1:9092"
ADMIN_CLIENT = AdminClient({'bootstrap.servers': server_address}) SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
class KafkaTopic(Enum): class KafkaTopic(Enum):
REQUEST = 'topic_request' REQUEST = 'topic_request'
...@@ -42,6 +42,7 @@ class KafkaTopic(Enum): ...@@ -42,6 +42,7 @@ class KafkaTopic(Enum):
Method to create Kafka topics defined as class members Method to create Kafka topics defined as class members
""" """
all_topics = [member.value for member in KafkaTopic] all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully") LOGGER.debug("All topics are created sucsessfully")
return True return True
......
...@@ -38,7 +38,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -38,7 +38,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
) -> Empty: ) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({ producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.SERVER_IP.value 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value
}) })
for kpi_value in request.kpi_value_list: for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value_to_produce : Tuple [str, Any, Any] = (
......
...@@ -51,10 +51,10 @@ class KpiValueWriter(GenericGrpcService): ...@@ -51,10 +51,10 @@ class KpiValueWriter(GenericGrpcService):
metric_writer = MetricWriterToPrometheus() metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer( kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, { 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value,
'group.id' : __class__, 'group.id' : __class__,
'auto.offset.reset' : 'latest'} 'auto.offset.reset' : 'latest'}
) )
kafka_consumer.subscribe([KafkaTopic.VALUE.value]) kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment