diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 1abbe7d7e592b1dbb18569e1432662943589e03f..9d42f1550d29e9a2abc9a9bf9b8fc054350e85ec 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -28,16 +28,14 @@ class KafkaConfig(Enum): def get_kafka_address() -> str: kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) - return SERVER_ADDRESS + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + return kafka_server_address @staticmethod def get_admin_client(): SERVER_ADDRESS = KafkaConfig.get_kafka_address() - LOGGER.debug("KAFKA_SERVER_ADDRESS {:}".format(SERVER_ADDRESS)) - # SERVER_ADDRESS = "127.0.0.1:9092" ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) return ADMIN_CLIENT @@ -55,7 +53,7 @@ class KafkaTopic(Enum): Method to create Kafka topics defined as class members """ all_topics = [member.value for member in KafkaTopic] - LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) + LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics are created sucsessfully") return True @@ -73,14 +71,14 @@ class KafkaTopic(Enum): LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) for topic in new_topics: try: - topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) + topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5) # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) - KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) + KafkaConfig.get_admin_client().create_topics([new_topic]) else: print("Topic name already exists: {:}".format(topic)) LOGGER.debug("Topic name already exists: {:}".format(topic)) diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 05ab63fdfb94de4b20408080cc7ddd648775305a..3df8dd5b66f36fe69ef9ada5d5fe882fc16abeb2 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -43,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.get_admin_client() + 'bootstrap.servers' : KafkaConfig.get_kafka_address() }) for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = (