Commit 5f7a012b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Kafka Tools:

- Corrected logic to create kafka topics
parent b7e6b698
Loading
Loading
Loading
Loading
+77 −44
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import logging, time
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
@@ -21,6 +21,12 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'

KAFKA_TOPIC_NUM_PARTITIONS     = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT       = 5
TOPIC_CREATE_WAIT_ITERATIONS   = 10
TOPIC_CREATE_WAIT_TIME         = 1

class KafkaConfig(Enum):

    @staticmethod
@@ -49,45 +55,72 @@ class KafkaTopic(Enum):
    ALARMS              = 'topic_alarms'
    ANALYTICS_REQUEST   = 'topic_analytics_request'
    ANALYTICS_RESPONSE  = 'topic_analytics_response'
    VNTMANAGER_REQUEST  = 'topic_vntmanager_request' 
    VNTMANAGER_RESPONSE = 'topic_vntmanager_response'

    @staticmethod
    def create_all_topics() -> bool:
        """
        '''
            Method to create Kafka topics defined as class members
        """
        all_topics = [member.value for member in KafkaTopic]
        LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
        if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
            LOGGER.debug("All topics are created sucsessfully or Already Exists")
        '''
        LOGGER.debug('Kafka server address: {:s} '.format(str(KafkaConfig.get_kafka_address())))
        kafka_admin_client = KafkaConfig.get_admin_client()

        topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
        existing_topics = set(topic_metadata.topics.keys())
        LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))

        missing_topics = [
            NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
            for topic in KafkaTopic
            if topic.value not in existing_topics
        ]
        LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))

        if len(missing_topics) == 0:
            LOGGER.debug('All topics already existed.')
            return True
        else:
            LOGGER.debug("Error creating all topics")
            return False

    @staticmethod
    def create_new_topic_if_not_exists(new_topics: list) -> bool:
        """
        Method to create Kafka topic if it does not exist.
        Args:
            list of topic: containing the topic name(s) to be created on Kafka
        """
        LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
        for topic in new_topics:
        create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
        LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
        failed_topic_creations = set()
        for topic, future in create_topic_future_map.items():
            try:
                topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
                # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
                if topic not in topic_metadata.topics:
                    # If the topic does not exist, create a new topic
                    print("Topic {:} does not exist. Creating...".format(topic))
                    LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
                    new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
                    KafkaConfig.get_admin_client().create_topics([new_topic])
                else:
                    print("Topic name already exists: {:}".format(topic))
                    LOGGER.debug("Topic name already exists: {:}".format(topic))
            except Exception as e:
                LOGGER.debug("Failed to create topic: {:}".format(e))
                LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                future.result()  # Blocks until topic is created or raises an exception
                LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
            except: # pylint: disable=bare-except
                LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                failed_topic_creations.add(topic)

        if len(failed_topic_creations) > 0: return False

        LOGGER.debug('All topics created.')

        # Wait until topics appear in metadata
        desired_topics = {topic.value for topic in KafkaTopic}
        missing_topics = set()
        for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
            missing_topics = desired_topics.difference(existing_topics)
            if len(missing_topics) == 0: break
            MSG = 'Waiting for Topics({:s}) to appear in metadata...'
            LOGGER.debug(MSG.format(str(missing_topics)))
            time.sleep(TOPIC_CREATE_WAIT_TIME)

        if len(missing_topics) > 0:
            MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
            LOGGER.error(MSG.format(str(missing_topics)))
            return False
        else:
            LOGGER.debug('All topics created and available.')
            return True

# TODO: create all topics after the deployments (Telemetry and Analytics)

if __name__ == '__main__':
    import os
    if 'KFK_SERVER_ADDRESS' not in os.environ:
        os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
    KafkaTopic.create_all_topics()