Loading src/common/tools/kafka/Variables.py +6 −5 Original line number Diff line number Diff line Loading @@ -36,9 +36,6 @@ class KafkaTopic(Enum): """ Method to create Kafka topics defined as class members """ # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values())) # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys())) # LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic])) all_topics = [member.value for member in KafkaTopic] if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics created sucsessfully") Loading @@ -54,16 +51,20 @@ class KafkaTopic(Enum): Args: list of topic: containing the topic name(s) to be created on Kafka """ LOGGER.debug("Recevied topic List: {:}".format(new_topics)) LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) for topic in new_topics: try: topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{topic}' does not exist. Creating...") print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) else: print("Topic name already exists: {:}".format(topic)) LOGGER.debug("Topic name already exists: {:}".format(topic)) except Exception as e: LOGGER.debug("Failed to create topic: {:}".format(e)) return False Loading src/kpi_value_api/requirements.in 0 → 100644 +2 −0 Original line number Diff line number Diff line confluent-kafka==2.3.0 requests==2.27.1 No newline at end of file src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +0 −3 Original line number Diff line number Diff line Loading @@ -105,19 +105,16 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): return KpiValueType(int64Val=int64_value) except ValueError: pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) except ValueError: pass # Check if the value is a boolean if value.lower() in ['true', 'false']: bool_value = value.lower() == 'true' return KpiValueType(boolVal=bool_value) # If none of the above, treat it as a string return KpiValueType(stringVal=value) Loading src/kpi_value_api/tests/test_kpi_value_api.py +1 −0 Original line number Diff line number Diff line Loading @@ -90,3 +90,4 @@ def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) assert isinstance(response, Empty) src/kpi_value_writer/requirements.in 0 → 100644 +2 −0 Original line number Diff line number Diff line confluent-kafka==2.3.0 requests==2.27.1 No newline at end of file Loading
src/common/tools/kafka/Variables.py +6 −5 Original line number Diff line number Diff line Loading @@ -36,9 +36,6 @@ class KafkaTopic(Enum): """ Method to create Kafka topics defined as class members """ # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values())) # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys())) # LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic])) all_topics = [member.value for member in KafkaTopic] if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics created sucsessfully") Loading @@ -54,16 +51,20 @@ class KafkaTopic(Enum): Args: list of topic: containing the topic name(s) to be created on Kafka """ LOGGER.debug("Recevied topic List: {:}".format(new_topics)) LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) for topic in new_topics: try: topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{topic}' does not exist. Creating...") print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) else: print("Topic name already exists: {:}".format(topic)) LOGGER.debug("Topic name already exists: {:}".format(topic)) except Exception as e: LOGGER.debug("Failed to create topic: {:}".format(e)) return False Loading
src/kpi_value_api/requirements.in 0 → 100644 +2 −0 Original line number Diff line number Diff line confluent-kafka==2.3.0 requests==2.27.1 No newline at end of file
src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +0 −3 Original line number Diff line number Diff line Loading @@ -105,19 +105,16 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): return KpiValueType(int64Val=int64_value) except ValueError: pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) except ValueError: pass # Check if the value is a boolean if value.lower() in ['true', 'false']: bool_value = value.lower() == 'true' return KpiValueType(boolVal=bool_value) # If none of the above, treat it as a string return KpiValueType(stringVal=value) Loading
src/kpi_value_api/tests/test_kpi_value_api.py +1 −0 Original line number Diff line number Diff line Loading @@ -90,3 +90,4 @@ def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) assert isinstance(response, Empty)
src/kpi_value_writer/requirements.in 0 → 100644 +2 −0 Original line number Diff line number Diff line confluent-kafka==2.3.0 requests==2.27.1 No newline at end of file