Commit 31679c7e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Kafka tools:

- Migrate from confluent kafka to kafka python for generic code as confluent kafka is written in C and imposes particular restrictions, see NBI README.
parent 97ded091
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

confluent-kafka==2.3.*
dask==2024.1.0
distributed==2024.1.0
kafka-python==2.0.6
pandas==2.2.3
confluent-kafka==2.3.*
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
APScheduler>=3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
kafka-python==2.0.6
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
+1 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@

confluent-kafka==2.3.*
psycopg2-binary==2.9.*
kafka-python==2.0.6
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
+1 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
APScheduler>=3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
kafka-python==2.0.6
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
+43 −24
Original line number Diff line number Diff line
@@ -14,7 +14,8 @@

import logging, time
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
#from confluent_kafka.admin import AdminClient, NewTopic
from kafka.admin import KafkaAdminClient, NewTopic
from common.Settings import get_setting


@@ -23,9 +24,10 @@ KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'

KAFKA_TOPIC_NUM_PARTITIONS         = 1
KAFKA_TOPIC_REPLICATION_FACTOR     = 1
KAFKA_TOPIC_LIST_TIMEOUT       = 5
TOPIC_CREATE_WAIT_ITERATIONS   = 10
TOPIC_CREATE_WAIT_TIME         = 1
#KAFKA_TOPIC_LIST_TIMEOUT           = 5
KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT = 60_000 # ms
KAFKA_TOPIC_CREATE_WAIT_ITERATIONS = 10
KAFKA_TOPIC_CREATE_WAIT_TIME       = 1

class KafkaConfig(Enum):

@@ -41,7 +43,8 @@ class KafkaConfig(Enum):
    @staticmethod
    def get_admin_client():
        SERVER_ADDRESS = KafkaConfig.get_kafka_address()
        ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
        #ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
        ADMIN_CLIENT   = KafkaAdminClient(bootstrap_servers=SERVER_ADDRESS)
        return ADMIN_CLIENT


@@ -67,8 +70,9 @@ class KafkaTopic(Enum):
        LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
        kafka_admin_client = KafkaConfig.get_admin_client()

        topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
        existing_topics = set(topic_metadata.topics.keys())
        #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
        #existing_topics = set(topic_metadata.topics.keys())
        existing_topics = set(kafka_admin_client.list_topics())
        LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))

        missing_topics = [
@@ -82,33 +86,48 @@ class KafkaTopic(Enum):
            LOGGER.debug('All topics already existed.')
            return True

        create_topic_future_map = kafka_admin_client.create_topics(missing_topics, request_timeout=5*60)
        LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
        #create_topic_future_map = kafka_admin_client.create_topics(missing_topics, request_timeout=5*60)
        #LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
        topics_result = kafka_admin_client.create_topics(
            new_topics=missing_topics, timeout_ms=KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT,
            validate_only=False
        )
        LOGGER.debug('topics_result={:s}'.format(str(topics_result)))

        failed_topic_creations = set()
        for topic, future in create_topic_future_map.items():
            try:
                LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                future.result()  # Blocks until topic is created or raises an exception
                LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
            except: # pylint: disable=bare-except
                LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                failed_topic_creations.add(topic)
        #for topic, future in create_topic_future_map.items():
        #    try:
        #        LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
        #        future.result()  # Blocks until topic is created or raises an exception
        #        LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
        #    except: # pylint: disable=bare-except
        #        LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
        #        failed_topic_creations.add(topic)
        for topic_name, error_code, error_message in topics_result.topic_errors:
            if error_code == 0 and error_message is None:
                MSG = 'Topic({:s}) successfully created.'
                LOGGER.info(MSG.format(str(topic_name)))
            else:
                MSG = 'Failed to create Topic({:s}): error_code={:s} error_message={:s}'
                LOGGER.error(MSG.format(str(topic_name), str(error_code), str(error_message)))
                failed_topic_creations.add(topic_name)

        if len(failed_topic_creations) > 0: return False

        LOGGER.debug('All topics created.')

        # Wait until topics appear in metadata
        desired_topics = {topic.value for topic in KafkaTopic}
        missing_topics = set()
        for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
        for _ in range(KAFKA_TOPIC_CREATE_WAIT_ITERATIONS):
            #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            #existing_topics = set(topic_metadata.topics.keys())
            existing_topics = set(kafka_admin_client.list_topics())
            LOGGER.debug('existing_topics={:s}'.format(str(existing_topics)))
            missing_topics = desired_topics.difference(existing_topics)
            if len(missing_topics) == 0: break
            MSG = 'Waiting for Topics({:s}) to appear in metadata...'
            LOGGER.debug(MSG.format(str(missing_topics)))
            time.sleep(TOPIC_CREATE_WAIT_TIME)
            time.sleep(KAFKA_TOPIC_CREATE_WAIT_TIME)

        if len(missing_topics) > 0:
            MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
Loading