From bdcb5c01852b63399adffed7b635b19119cc73c1 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Wed, 31 Jul 2024 12:30:39 +0000 Subject: [PATCH] Changes to resolve Kafka server error - KFK_SERVER_ADDRESS_TEMPLATE now defined inside the class KafkaConfig. - variable renamed to "SERVER_ADDRESS" from "server_address" --- src/common/tools/kafka/Variables.py | 13 +++++++------ .../service/KpiValueApiServiceServicerImpl.py | 2 +- src/kpi_value_writer/service/KpiValueWriter.py | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 89ac42f90..9abc32b3e 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -20,14 +20,14 @@ from common.Settings import get_setting LOGGER = logging.getLogger(__name__) -KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' class KafkaConfig(Enum): - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - # SERVER_IP = "127.0.0.1:9092" - server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) - ADMIN_CLIENT = AdminClient({'bootstrap.servers': server_address}) + KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + # SERVER_ADDRESS = "127.0.0.1:9092" + SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) class KafkaTopic(Enum): REQUEST = 'topic_request' @@ -42,6 +42,7 @@ class KafkaTopic(Enum): Method to create Kafka topics defined as class members """ all_topics = [member.value for member in KafkaTopic] + LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics are created sucsessfully") return True diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index d27de54f3..1559457d7 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -38,7 +38,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.SERVER_IP.value + 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value }) for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = ( diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 022126fd0..5e2b6babe 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -51,10 +51,10 @@ class KpiValueWriter(GenericGrpcService): metric_writer = MetricWriterToPrometheus() kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, + { 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, 'group.id' : __class__, 'auto.offset.reset' : 'latest'} - ) + ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) -- GitLab