From be0d616701a1552e79768b7866a0e47aa908bd86 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Fri, 9 Aug 2024 08:06:23 +0000
Subject: [PATCH] Kafka deployment script in gitlab-ci.file (2)

- Improvements in Kafka.variables files.
- In KpiValueApiServiceImpl corrected the call from "admin_client()" to "kafka_address()"
---
 src/common/tools/kafka/Variables.py              | 16 +++++++---------
 .../service/KpiValueApiServiceServicerImpl.py    |  2 +-
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py
index 1abbe7d7e..9d42f1550 100644
--- a/src/common/tools/kafka/Variables.py
+++ b/src/common/tools/kafka/Variables.py
@@ -28,16 +28,14 @@ class KafkaConfig(Enum):
     def get_kafka_address() -> str:
         kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
         if kafka_server_address is None:
-            KFK_NAMESPACE  = get_setting('KFK_NAMESPACE')
-            KFK_PORT       = get_setting('KFK_SERVER_PORT')
-            SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
-        return SERVER_ADDRESS
+            KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
+            KFK_PORT             = get_setting('KFK_SERVER_PORT')
+            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
+        return kafka_server_address
         
     @staticmethod
     def get_admin_client():
         SERVER_ADDRESS = KafkaConfig.get_kafka_address()
-        LOGGER.debug("KAFKA_SERVER_ADDRESS {:}".format(SERVER_ADDRESS))
-        # SERVER_ADDRESS = "127.0.0.1:9092"
         ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
         return ADMIN_CLIENT
 
@@ -55,7 +53,7 @@ class KafkaTopic(Enum):
             Method to create Kafka topics defined as class members
         """
         all_topics = [member.value for member in KafkaTopic]
-        LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value))
+        LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
         if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
             LOGGER.debug("All topics are created sucsessfully")
             return True
@@ -73,14 +71,14 @@ class KafkaTopic(Enum):
         LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
         for topic in new_topics:
             try:
-                topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5)
+                topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
                 # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
                 if topic not in topic_metadata.topics:
                     # If the topic does not exist, create a new topic
                     print("Topic {:} does not exist. Creating...".format(topic))
                     LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
                     new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
-                    KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic])
+                    KafkaConfig.get_admin_client().create_topics([new_topic])
                 else:
                     print("Topic name already exists: {:}".format(topic))
                     LOGGER.debug("Topic name already exists: {:}".format(topic))
diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
index 05ab63fdf..3df8dd5b6 100644
--- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
+++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
@@ -43,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
                        ) -> Empty:
         LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
         producer_obj = KafkaProducer({
-            'bootstrap.servers' : KafkaConfig.get_admin_client()
+            'bootstrap.servers' : KafkaConfig.get_kafka_address()
         })
         for kpi_value in request.kpi_value_list:
             kpi_value_to_produce : Tuple [str, Any, Any] = (
-- 
GitLab