Skip to content
Snippets Groups Projects
Variables.py 5.24 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Waleed Akbar's avatar
Waleed Akbar committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, time
Waleed Akbar's avatar
Waleed Akbar committed
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting


LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
KAFKA_TOPIC_NUM_PARTITIONS     = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT       = 5
TOPIC_CREATE_WAIT_ITERATIONS   = 10
TOPIC_CREATE_WAIT_TIME         = 1

Waleed Akbar's avatar
Waleed Akbar committed
class KafkaConfig(Enum):

    @staticmethod
    def get_kafka_address() -> str:
        kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
        if kafka_server_address is None:
            KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
            KFK_PORT             = get_setting('KFK_SERVER_PORT')
            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
        return kafka_server_address
        
    @staticmethod
    def get_admin_client():
        SERVER_ADDRESS = KafkaConfig.get_kafka_address()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
        return ADMIN_CLIENT

Waleed Akbar's avatar
Waleed Akbar committed

class KafkaTopic(Enum):
    # TODO: Later to be populated from ENV variable.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    TELEMETRY_REQUEST    = 'topic_telemetry_request' 
    TELEMETRY_RESPONSE   = 'topic_telemetry_response'
    RAW                  = 'topic_raw' 
    LABELED              = 'topic_labeled'
    VALUE                = 'topic_value'
    ALARMS               = 'topic_alarms'
    ANALYTICS_REQUEST    = 'topic_analytics_request'
    ANALYTICS_RESPONSE   = 'topic_analytics_response'
    VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
    VNTMANAGER_RESPONSE  = 'topic_vntmanager_response'
    NBI_SOCKETIO_WORKERS = 'tfs-nbi-socketio'
    @staticmethod
    def create_all_topics() -> bool:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        '''
            Method to create Kafka topics defined as class members
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        '''
        LOGGER.debug('Kafka server address: {:s} '.format(str(KafkaConfig.get_kafka_address())))
        kafka_admin_client = KafkaConfig.get_admin_client()

        topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
        existing_topics = set(topic_metadata.topics.keys())
        LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))

        missing_topics = [
            NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
            for topic in KafkaTopic
            if topic.value not in existing_topics
        ]
        LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))

        if len(missing_topics) == 0:
            LOGGER.debug('All topics already existed.')
            return True
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
        LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
        failed_topic_creations = set()
        for topic, future in create_topic_future_map.items():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                future.result()  # Blocks until topic is created or raises an exception
                LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
            except: # pylint: disable=bare-except
                LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                failed_topic_creations.add(topic)

        if len(failed_topic_creations) > 0: return False

        LOGGER.debug('All topics created.')

        # Wait until topics appear in metadata
        desired_topics = {topic.value for topic in KafkaTopic}
        missing_topics = set()
        for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
            missing_topics = desired_topics.difference(existing_topics)
            if len(missing_topics) == 0: break
            MSG = 'Waiting for Topics({:s}) to appear in metadata...'
            LOGGER.debug(MSG.format(str(missing_topics)))
            time.sleep(TOPIC_CREATE_WAIT_TIME)

        if len(missing_topics) > 0:
            MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
            LOGGER.error(MSG.format(str(missing_topics)))
            return False
        else:
            LOGGER.debug('All topics created and available.')
            return True
# TODO: create all topics after the deployments (Telemetry and Analytics)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

if __name__ == '__main__':
    import os
    if 'KFK_SERVER_ADDRESS' not in os.environ:
        os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
    KafkaTopic.create_all_topics()