Skip to content
Snippets Groups Projects
Variables.py 5.24 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import logging, time
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from enum import Enum
    
    from confluent_kafka.admin import AdminClient, NewTopic
    
    from common.Settings import get_setting
    
    
    
    LOGGER = logging.getLogger(__name__)
    
    KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    KAFKA_TOPIC_NUM_PARTITIONS     = 1
    KAFKA_TOPIC_REPLICATION_FACTOR = 1
    KAFKA_TOPIC_LIST_TIMEOUT       = 5
    TOPIC_CREATE_WAIT_ITERATIONS   = 10
    TOPIC_CREATE_WAIT_TIME         = 1
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    class KafkaConfig(Enum):
    
    
        @staticmethod
        def get_kafka_address() -> str:
    
            kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
            if kafka_server_address is None:
                KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
                KFK_PORT             = get_setting('KFK_SERVER_PORT')
    
                kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
    
            return kafka_server_address
    
            
        @staticmethod
        def get_admin_client():
            SERVER_ADDRESS = KafkaConfig.get_kafka_address()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
    
            return ADMIN_CLIENT
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    
    class KafkaTopic(Enum):
    
        # TODO: Later to be populated from ENV variable.
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        TELEMETRY_REQUEST    = 'topic_telemetry_request' 
        TELEMETRY_RESPONSE   = 'topic_telemetry_response'
        RAW                  = 'topic_raw' 
        LABELED              = 'topic_labeled'
        VALUE                = 'topic_value'
        ALARMS               = 'topic_alarms'
        ANALYTICS_REQUEST    = 'topic_analytics_request'
        ANALYTICS_RESPONSE   = 'topic_analytics_response'
        VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
        VNTMANAGER_RESPONSE  = 'topic_vntmanager_response'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
    
        @staticmethod
        def create_all_topics() -> bool:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
    
                Method to create Kafka topics defined as class members
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            kafka_admin_client = KafkaConfig.get_admin_client()
    
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
            LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
    
            missing_topics = [
                NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
                for topic in KafkaTopic
                if topic.value not in existing_topics
            ]
            LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
    
            if len(missing_topics) == 0:
                LOGGER.debug('All topics already existed.')
    
                return True
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
            LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
            failed_topic_creations = set()
            for topic, future in create_topic_future_map.items():
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                    future.result()  # Blocks until topic is created or raises an exception
                    LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
                except: # pylint: disable=bare-except
                    LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                    failed_topic_creations.add(topic)
    
            if len(failed_topic_creations) > 0: return False
    
            LOGGER.debug('All topics created.')
    
            # Wait until topics appear in metadata
            desired_topics = {topic.value for topic in KafkaTopic}
            missing_topics = set()
            for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
                topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
                existing_topics = set(topic_metadata.topics.keys())
                missing_topics = desired_topics.difference(existing_topics)
                if len(missing_topics) == 0: break
                MSG = 'Waiting for Topics({:s}) to appear in metadata...'
                LOGGER.debug(MSG.format(str(missing_topics)))
                time.sleep(TOPIC_CREATE_WAIT_TIME)
    
            if len(missing_topics) > 0:
                MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
                LOGGER.error(MSG.format(str(missing_topics)))
                return False
            else:
                LOGGER.debug('All topics created and available.')
                return True
    
    # TODO: create all topics after the deployments (Telemetry and Analytics)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    if __name__ == '__main__':
        import os
        if 'KFK_SERVER_ADDRESS' not in os.environ:
            os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
        KafkaTopic.create_all_topics()