Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Variables.py 5.24 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import logging, time
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from enum import Enum
    
    from confluent_kafka.admin import AdminClient, NewTopic
    
    from common.Settings import get_setting
    
    
    
    LOGGER = logging.getLogger(__name__)
    
    KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    KAFKA_TOPIC_NUM_PARTITIONS     = 1
    KAFKA_TOPIC_REPLICATION_FACTOR = 1
    KAFKA_TOPIC_LIST_TIMEOUT       = 5
    TOPIC_CREATE_WAIT_ITERATIONS   = 10
    TOPIC_CREATE_WAIT_TIME         = 1
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    class KafkaConfig(Enum):
    
    
        @staticmethod
        def get_kafka_address() -> str:
    
            kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
            if kafka_server_address is None:
                KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
                KFK_PORT             = get_setting('KFK_SERVER_PORT')
    
                kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
    
            return kafka_server_address
    
            
        @staticmethod
        def get_admin_client():
            SERVER_ADDRESS = KafkaConfig.get_kafka_address()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
    
            return ADMIN_CLIENT
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    
    class KafkaTopic(Enum):
    
        # TODO: Later to be populated from ENV variable.
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        TELEMETRY_REQUEST    = 'topic_telemetry_request' 
        TELEMETRY_RESPONSE   = 'topic_telemetry_response'
        RAW                  = 'topic_raw' 
        LABELED              = 'topic_labeled'
        VALUE                = 'topic_value'
        ALARMS               = 'topic_alarms'
        ANALYTICS_REQUEST    = 'topic_analytics_request'
        ANALYTICS_RESPONSE   = 'topic_analytics_response'
        VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
        VNTMANAGER_RESPONSE  = 'topic_vntmanager_response'
        NBI_SOCKETIO_WORKERS = 'tfs-nbi-socketio'
    
        @staticmethod
        def create_all_topics() -> bool:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
    
                Method to create Kafka topics defined as class members
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
            LOGGER.debug('Kafka server address: {:s} '.format(str(KafkaConfig.get_kafka_address())))
            kafka_admin_client = KafkaConfig.get_admin_client()
    
            topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            existing_topics = set(topic_metadata.topics.keys())
            LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
    
            missing_topics = [
                NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
                for topic in KafkaTopic
                if topic.value not in existing_topics
            ]
            LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
    
            if len(missing_topics) == 0:
                LOGGER.debug('All topics already existed.')
    
                return True
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
            LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
            failed_topic_creations = set()
            for topic, future in create_topic_future_map.items():
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
                    future.result()  # Blocks until topic is created or raises an exception
                    LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
                except: # pylint: disable=bare-except
                    LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
                    failed_topic_creations.add(topic)
    
            if len(failed_topic_creations) > 0: return False
    
            LOGGER.debug('All topics created.')
    
            # Wait until topics appear in metadata
            desired_topics = {topic.value for topic in KafkaTopic}
            missing_topics = set()
            for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
                topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
                existing_topics = set(topic_metadata.topics.keys())
                missing_topics = desired_topics.difference(existing_topics)
                if len(missing_topics) == 0: break
                MSG = 'Waiting for Topics({:s}) to appear in metadata...'
                LOGGER.debug(MSG.format(str(missing_topics)))
                time.sleep(TOPIC_CREATE_WAIT_TIME)
    
            if len(missing_topics) > 0:
                MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
                LOGGER.error(MSG.format(str(missing_topics)))
                return False
            else:
                LOGGER.debug('All topics created and available.')
                return True
    
    # TODO: create all topics after the deployments (Telemetry and Analytics)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    if __name__ == '__main__':
        import os
        if 'KFK_SERVER_ADDRESS' not in os.environ:
            os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
        KafkaTopic.create_all_topics()