Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KAFKA_TOPIC_NUM_PARTITIONS = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT = 5
TOPIC_CREATE_WAIT_ITERATIONS = 10
TOPIC_CREATE_WAIT_TIME = 1
@staticmethod
def get_kafka_address() -> str:
kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
@staticmethod
def get_admin_client():
SERVER_ADDRESS = KafkaConfig.get_kafka_address()
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
# TODO: Later to be populated from ENV variable.
TELEMETRY_REQUEST = 'topic_telemetry_request'
TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response'
VNTMANAGER_REQUEST = 'topic_vntmanager_request'
VNTMANAGER_RESPONSE = 'topic_vntmanager_response'
@staticmethod
def create_all_topics() -> bool:
Method to create Kafka topics defined as class members
LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
kafka_admin_client = KafkaConfig.get_admin_client()
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
missing_topics = [
NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
for topic in KafkaTopic
if topic.value not in existing_topics
]
LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
if len(missing_topics) == 0:
LOGGER.debug('All topics already existed.')
create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
failed_topic_creations = set()
for topic, future in create_topic_future_map.items():
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
future.result() # Blocks until topic is created or raises an exception
LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
except: # pylint: disable=bare-except
LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
failed_topic_creations.add(topic)
if len(failed_topic_creations) > 0: return False
LOGGER.debug('All topics created.')
# Wait until topics appear in metadata
desired_topics = {topic.value for topic in KafkaTopic}
missing_topics = set()
for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
missing_topics = desired_topics.difference(existing_topics)
if len(missing_topics) == 0: break
MSG = 'Waiting for Topics({:s}) to appear in metadata...'
LOGGER.debug(MSG.format(str(missing_topics)))
time.sleep(TOPIC_CREATE_WAIT_TIME)
if len(missing_topics) > 0:
MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
LOGGER.error(MSG.format(str(missing_topics)))
return False
else:
LOGGER.debug('All topics created and available.')
return True
# TODO: create all topics after the deployments (Telemetry and Analytics)
if __name__ == '__main__':
import os
if 'KFK_SERVER_ADDRESS' not in os.environ:
os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
KafkaTopic.create_all_topics()