Skip to content
Snippets Groups Projects
Variables.py 5.24 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Waleed Akbar's avatar
Waleed Akbar committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging, time
Waleed Akbar's avatar
Waleed Akbar committed
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting


LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
KAFKA_TOPIC_NUM_PARTITIONS     = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT       = 5
TOPIC_CREATE_WAIT_ITERATIONS   = 10
TOPIC_CREATE_WAIT_TIME         = 1

Waleed Akbar's avatar
Waleed Akbar committed
class KafkaConfig(Enum):

    @staticmethod
    def get_kafka_address() -> str:
        kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
        if kafka_server_address is None:
            KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
            KFK_PORT             = get_setting('KFK_SERVER_PORT')
            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
        return kafka_server_address
        
    @staticmethod
    def get_admin_client():
        SERVER_ADDRESS = KafkaConfig.get_kafka_address()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
        return ADMIN_CLIENT

Waleed Akbar's avatar
Waleed Akbar committed

class KafkaTopic(Enum):
    # TODO: Later to be populated from ENV variable.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    TELEMETRY_REQUEST    = 'topic_telemetry_request' 
    TELEMETRY_RESPONSE   = 'topic_telemetry_response'
    RAW                  = 'topic_raw' 
    LABELED              = 'topic_labeled'
    VALUE                = 'topic_value'
    ALARMS               = 'topic_alarms'
    ANALYTICS_REQUEST    = 'topic_analytics_request'
    ANALYTICS_RESPONSE   = 'topic_analytics_response'
    VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
    VNTMANAGER_RESPONSE  = 'topic_vntmanager_response'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
    @staticmethod
    def create_all_topics() -> bool:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        '''
            Method to create Kafka topics defined as class members
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        '''
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        kafka_admin_client = KafkaConfig.get_admin_client()

        topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
        existing_topics = set(topic_metadata.topics.keys())
        LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))

        missing_topics = [
            NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
            for topic in KafkaTopic
            if topic.value not in existing_topics
        ]
        LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))

        if len(missing_topics) == 0:
            LOGGER.debug('All topics already existed.')
            return True
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
        LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
        failed_topic_creations = set()
        for topic, future in create_topic_future_map.items():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                future.result()  # Blocks until topic is created or raises an exception
                LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
            except: # pylint: disable=bare-except
                LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                failed_topic_creations.add(topic)

        if len(failed_topic_creations) > 0: return False

        LOGGER.debug('All topics created.')

        # Wait until topics appear in metadata
        desired_topics = {topic.value for topic in KafkaTopic}
        missing_topics = set()
        for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
            missing_topics = desired_topics.difference(existing_topics)
            if len(missing_topics) == 0: break
            MSG = 'Waiting for Topics({:s}) to appear in metadata...'
            LOGGER.debug(MSG.format(str(missing_topics)))
            time.sleep(TOPIC_CREATE_WAIT_TIME)

        if len(missing_topics) > 0:
            MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
            LOGGER.error(MSG.format(str(missing_topics)))
            return False
        else:
            LOGGER.debug('All topics created and available.')
            return True
# TODO: create all topics after the deployments (Telemetry and Analytics)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

if __name__ == '__main__':
    import os
    if 'KFK_SERVER_ADDRESS' not in os.environ:
        os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
    KafkaTopic.create_all_topics()