diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index be61971730a0b5d39f15d4438eb3062f30d08027..93db082b9d916d2475c6d351be78318467c6b8ce 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +confluent-kafka==2.3.* dask==2024.1.0 distributed==2024.1.0 +kafka-python==2.0.6 pandas==2.2.3 -confluent-kafka==2.3.* diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in index 4d403781a8e87baf76d121af75c68e2c1df19388..eeaeebe43d99db5bcabc708f3424443c5e53202f 100644 --- a/src/analytics/frontend/requirements.in +++ b/src/analytics/frontend/requirements.in @@ -15,6 +15,7 @@ APScheduler>=3.10.4 confluent-kafka==2.3.* psycopg2-binary==2.9.* +kafka-python==2.0.6 SQLAlchemy==1.4.* sqlalchemy-cockroachdb==1.4.* SQLAlchemy-Utils==0.38.* diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in index 5ca81a10fbe1c0137ee8e83848371a4dbccd4271..413084a9fea619e4b5d56bb8e8b2139e961d7d6b 100644 --- a/src/analytics/requirements.in +++ b/src/analytics/requirements.in @@ -14,6 +14,7 @@ confluent-kafka==2.3.* psycopg2-binary==2.9.* +kafka-python==2.0.6 SQLAlchemy==1.4.* sqlalchemy-cockroachdb==1.4.* SQLAlchemy-Utils==0.38.* diff --git a/src/automation/requirements.in b/src/automation/requirements.in index 4d403781a8e87baf76d121af75c68e2c1df19388..eeaeebe43d99db5bcabc708f3424443c5e53202f 100644 --- a/src/automation/requirements.in +++ b/src/automation/requirements.in @@ -15,6 +15,7 @@ APScheduler>=3.10.4 confluent-kafka==2.3.* psycopg2-binary==2.9.* +kafka-python==2.0.6 SQLAlchemy==1.4.* sqlalchemy-cockroachdb==1.4.* SQLAlchemy-Utils==0.38.* diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 4537c3e0d48f7c0c74fd9df9796d77ac4f7b7ad7..814284716fd76da0cdcd8752e2711534c96708bb 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -14,18 +14,20 @@ import logging, time from enum import Enum -from confluent_kafka.admin import AdminClient, NewTopic +#from confluent_kafka.admin import AdminClient, NewTopic +from kafka.admin import KafkaAdminClient, NewTopic from common.Settings import get_setting LOGGER = logging.getLogger(__name__) KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' -KAFKA_TOPIC_NUM_PARTITIONS = 1 -KAFKA_TOPIC_REPLICATION_FACTOR = 1 -KAFKA_TOPIC_LIST_TIMEOUT = 5 -TOPIC_CREATE_WAIT_ITERATIONS = 10 -TOPIC_CREATE_WAIT_TIME = 1 +KAFKA_TOPIC_NUM_PARTITIONS = 1 +KAFKA_TOPIC_REPLICATION_FACTOR = 1 +#KAFKA_TOPIC_LIST_TIMEOUT = 5 +KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT = 60_000 # ms +KAFKA_TOPIC_CREATE_WAIT_ITERATIONS = 10 +KAFKA_TOPIC_CREATE_WAIT_TIME = 1 class KafkaConfig(Enum): @@ -41,7 +43,8 @@ class KafkaConfig(Enum): @staticmethod def get_admin_client(): SERVER_ADDRESS = KafkaConfig.get_kafka_address() - ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS}) + #ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS}) + ADMIN_CLIENT = KafkaAdminClient(bootstrap_servers=SERVER_ADDRESS) return ADMIN_CLIENT @@ -67,8 +70,9 @@ class KafkaTopic(Enum): LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address()))) kafka_admin_client = KafkaConfig.get_admin_client() - topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) - existing_topics = set(topic_metadata.topics.keys()) + #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) + #existing_topics = set(topic_metadata.topics.keys()) + existing_topics = set(kafka_admin_client.list_topics()) LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics))) missing_topics = [ @@ -82,33 +86,48 @@ class KafkaTopic(Enum): LOGGER.debug('All topics already existed.') return True - create_topic_future_map = kafka_admin_client.create_topics(missing_topics, request_timeout=5*60) - LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map))) + #create_topic_future_map = kafka_admin_client.create_topics(missing_topics, request_timeout=5*60) + #LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map))) + topics_result = kafka_admin_client.create_topics( + new_topics=missing_topics, timeout_ms=KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT, + validate_only=False + ) + LOGGER.debug('topics_result={:s}'.format(str(topics_result))) + failed_topic_creations = set() - for topic, future in create_topic_future_map.items(): - try: - LOGGER.info('Waiting for Topic({:s})...'.format(str(topic))) - future.result() # Blocks until topic is created or raises an exception - LOGGER.info('Topic({:s}) successfully created.'.format(str(topic))) - except: # pylint: disable=bare-except - LOGGER.exception('Failed to create Topic({:s})'.format(str(topic))) - failed_topic_creations.add(topic) + #for topic, future in create_topic_future_map.items(): + # try: + # LOGGER.info('Waiting for Topic({:s})...'.format(str(topic))) + # future.result() # Blocks until topic is created or raises an exception + # LOGGER.info('Topic({:s}) successfully created.'.format(str(topic))) + # except: # pylint: disable=bare-except + # LOGGER.exception('Failed to create Topic({:s})'.format(str(topic))) + # failed_topic_creations.add(topic) + for topic_name, error_code, error_message in topics_result.topic_errors: + if error_code == 0 and error_message is None: + MSG = 'Topic({:s}) successfully created.' + LOGGER.info(MSG.format(str(topic_name))) + else: + MSG = 'Failed to create Topic({:s}): error_code={:s} error_message={:s}' + LOGGER.error(MSG.format(str(topic_name), str(error_code), str(error_message))) + failed_topic_creations.add(topic_name) if len(failed_topic_creations) > 0: return False - LOGGER.debug('All topics created.') # Wait until topics appear in metadata desired_topics = {topic.value for topic in KafkaTopic} missing_topics = set() - for _ in range(TOPIC_CREATE_WAIT_ITERATIONS): - topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) - existing_topics = set(topic_metadata.topics.keys()) + for _ in range(KAFKA_TOPIC_CREATE_WAIT_ITERATIONS): + #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT) + #existing_topics = set(topic_metadata.topics.keys()) + existing_topics = set(kafka_admin_client.list_topics()) + LOGGER.debug('existing_topics={:s}'.format(str(existing_topics))) missing_topics = desired_topics.difference(existing_topics) if len(missing_topics) == 0: break MSG = 'Waiting for Topics({:s}) to appear in metadata...' LOGGER.debug(MSG.format(str(missing_topics))) - time.sleep(TOPIC_CREATE_WAIT_TIME) + time.sleep(KAFKA_TOPIC_CREATE_WAIT_TIME) if len(missing_topics) > 0: MSG = 'Something went wrong... Topics({:s}) does not appear in metadata' diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index b4b105d9f832c005b62a9b9d9030f0589f83cf9a..9aca200125741b3ecf6770bffccbeca63b84a36b 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -14,5 +14,6 @@ APScheduler>=3.10.4 confluent-kafka==2.3.* +kafka-python==2.0.6 prometheus-api-client==0.5.3 requests==2.27.* diff --git a/src/kpi_value_writer/requirements.in b/src/kpi_value_writer/requirements.in index ac0c82cc3c638f17b777459dec1577b68ad2a0ba..089098ade65d007fc16f79a5c771f05172844345 100644 --- a/src/kpi_value_writer/requirements.in +++ b/src/kpi_value_writer/requirements.in @@ -13,4 +13,5 @@ # limitations under the License. confluent-kafka==2.3.* +kafka-python==2.0.6 requests==2.27.* diff --git a/src/nbi/requirements.in b/src/nbi/requirements.in index 9ebb937a3e1e130ebe0dadf6c1aaba883486889c..80fd024d066cfa44bcda62d611f1887b778ee5f9 100644 --- a/src/nbi/requirements.in +++ b/src/nbi/requirements.in @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -confluent-kafka==2.3.* # only for creating topics and compatibility deepdiff==6.7.* deepmerge==1.1.* eventlet==0.39.0 @@ -25,7 +24,7 @@ flask-socketio==5.5.1 #greenlet==3.1.1 gunicorn==23.0.0 jsonschema==4.4.0 -kafka-python==2.0.6 # for publishing and consuming messages in an eventlet-compatible way +kafka-python==2.0.6 libyang==2.8.4 netaddr==0.9.0 pyang==2.6.0 diff --git a/src/telemetry/backend/requirements.in b/src/telemetry/backend/requirements.in index 828cd0330b8210d0696818c79e73d2eefba1a550..5e9509a591431972829a8b64ef59cf982b43cdbd 100644 --- a/src/telemetry/backend/requirements.in +++ b/src/telemetry/backend/requirements.in @@ -15,5 +15,6 @@ anytree==2.8.0 APScheduler>=3.10.4 confluent-kafka==2.3.* +kafka-python==2.0.6 numpy==2.0.1 pytz>=2025.2 diff --git a/src/telemetry/frontend/requirements.in b/src/telemetry/frontend/requirements.in index 5ca81a10fbe1c0137ee8e83848371a4dbccd4271..99b7551e31fdc173d965fa3e40da7fee8c6e6836 100644 --- a/src/telemetry/frontend/requirements.in +++ b/src/telemetry/frontend/requirements.in @@ -13,6 +13,7 @@ # limitations under the License. confluent-kafka==2.3.* +kafka-python==2.0.6 psycopg2-binary==2.9.* SQLAlchemy==1.4.* sqlalchemy-cockroachdb==1.4.* diff --git a/src/vnt_manager/requirements.in b/src/vnt_manager/requirements.in index d8f9537b471d5645604ff39ab5142c3ae0e7c9ba..f916d1f1304bc84b17913c16d5fc7ac8be776eb3 100644 --- a/src/vnt_manager/requirements.in +++ b/src/vnt_manager/requirements.in @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -confluent-kafka==2.3.* +kafka-python==2.0.6 #websockets==12.0