Skip to content
Snippets Groups Projects
Commit b53a76a5 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Kafka deployment script in gitlab-ci.file (2)

- Improvements in Kafka.variables files.
- In KpiValueApiServiceImpl corrected the call from "admin_client()" to "kafka_address()"
parent 466fd377
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -28,16 +28,14 @@ class KafkaConfig(Enum): ...@@ -28,16 +28,14 @@ class KafkaConfig(Enum):
def get_kafka_address() -> str: def get_kafka_address() -> str:
kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
if kafka_server_address is None: if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE') KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT') KFK_PORT = get_setting('KFK_SERVER_PORT')
SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
return SERVER_ADDRESS return kafka_server_address
@staticmethod @staticmethod
def get_admin_client(): def get_admin_client():
SERVER_ADDRESS = KafkaConfig.get_kafka_address() SERVER_ADDRESS = KafkaConfig.get_kafka_address()
LOGGER.debug("KAFKA_SERVER_ADDRESS {:}".format(SERVER_ADDRESS))
# SERVER_ADDRESS = "127.0.0.1:9092"
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
return ADMIN_CLIENT return ADMIN_CLIENT
...@@ -55,7 +53,7 @@ class KafkaTopic(Enum): ...@@ -55,7 +53,7 @@ class KafkaTopic(Enum):
Method to create Kafka topics defined as class members Method to create Kafka topics defined as class members
""" """
all_topics = [member.value for member in KafkaTopic] all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully") LOGGER.debug("All topics are created sucsessfully")
return True return True
...@@ -73,14 +71,14 @@ class KafkaTopic(Enum): ...@@ -73,14 +71,14 @@ class KafkaTopic(Enum):
LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics: for topic in new_topics:
try: try:
topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics: if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic # If the topic does not exist, create a new topic
print("Topic {:} does not exist. Creating...".format(topic)) print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) KafkaConfig.get_admin_client().create_topics([new_topic])
else: else:
print("Topic name already exists: {:}".format(topic)) print("Topic name already exists: {:}".format(topic))
LOGGER.debug("Topic name already exists: {:}".format(topic)) LOGGER.debug("Topic name already exists: {:}".format(topic))
......
...@@ -43,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -43,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
) -> Empty: ) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({ producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_admin_client() 'bootstrap.servers' : KafkaConfig.get_kafka_address()
}) })
for kpi_value in request.kpi_value_list: for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value_to_produce : Tuple [str, Any, Any] = (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment