Skip to content
Snippets Groups Projects
Variables.py 6.43 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import logging, time
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from enum import Enum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    #from confluent_kafka.admin import AdminClient, NewTopic
    from kafka.admin import KafkaAdminClient, NewTopic
    
    from common.Settings import get_setting
    
    
    
    LOGGER = logging.getLogger(__name__)
    
    KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    KAFKA_TOPIC_NUM_PARTITIONS         = 1
    KAFKA_TOPIC_REPLICATION_FACTOR     = 1
    #KAFKA_TOPIC_LIST_TIMEOUT           = 5
    KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT = 60_000 # ms
    KAFKA_TOPIC_CREATE_WAIT_ITERATIONS = 10
    KAFKA_TOPIC_CREATE_WAIT_TIME       = 1
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    class KafkaConfig(Enum):
    
    
        @staticmethod
        def get_kafka_address() -> str:
    
            kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
            if kafka_server_address is None:
                KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
                KFK_PORT             = get_setting('KFK_SERVER_PORT')
    
                kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
    
            return kafka_server_address
    
            
        @staticmethod
        def get_admin_client():
            SERVER_ADDRESS = KafkaConfig.get_kafka_address()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #ADMIN_CLIENT   = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
            ADMIN_CLIENT   = KafkaAdminClient(bootstrap_servers=SERVER_ADDRESS)
    
            return ADMIN_CLIENT
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    
    class KafkaTopic(Enum):
    
        # TODO: Later to be populated from ENV variable.
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        TELEMETRY_REQUEST    = 'topic_telemetry_request' 
        TELEMETRY_RESPONSE   = 'topic_telemetry_response'
        RAW                  = 'topic_raw' 
        LABELED              = 'topic_labeled'
        VALUE                = 'topic_value'
        ALARMS               = 'topic_alarms'
        ANALYTICS_REQUEST    = 'topic_analytics_request'
        ANALYTICS_RESPONSE   = 'topic_analytics_response'
        VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
        VNTMANAGER_RESPONSE  = 'topic_vntmanager_response'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
    
        @staticmethod
        def create_all_topics() -> bool:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
    
                Method to create Kafka topics defined as class members
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            '''
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            kafka_admin_client = KafkaConfig.get_admin_client()
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
            #existing_topics = set(topic_metadata.topics.keys())
            existing_topics = set(kafka_admin_client.list_topics())
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
    
            missing_topics = [
                NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
                for topic in KafkaTopic
                if topic.value not in existing_topics
            ]
            LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
    
            if len(missing_topics) == 0:
                LOGGER.debug('All topics already existed.')
    
                return True
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #create_topic_future_map = kafka_admin_client.create_topics(missing_topics, request_timeout=5*60)
            #LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
            topics_result = kafka_admin_client.create_topics(
                new_topics=missing_topics, timeout_ms=KAFKA_TOPIC_CREATE_REQUEST_TIMEOUT,
                validate_only=False
            )
            LOGGER.debug('topics_result={:s}'.format(str(topics_result)))
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            failed_topic_creations = set()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #for topic, future in create_topic_future_map.items():
            #    try:
            #        LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
            #        future.result()  # Blocks until topic is created or raises an exception
            #        LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
            #    except: # pylint: disable=bare-except
            #        LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
            #        failed_topic_creations.add(topic)
            for topic_name, error_code, error_message in topics_result.topic_errors:
                if error_code == 0 and error_message is None:
                    MSG = 'Topic({:s}) successfully created.'
                    LOGGER.info(MSG.format(str(topic_name)))
                else:
                    MSG = 'Failed to create Topic({:s}): error_code={:s} error_message={:s}'
                    LOGGER.error(MSG.format(str(topic_name), str(error_code), str(error_message)))
                    failed_topic_creations.add(topic_name)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            if len(failed_topic_creations) > 0: return False
            LOGGER.debug('All topics created.')
    
            # Wait until topics appear in metadata
            desired_topics = {topic.value for topic in KafkaTopic}
            missing_topics = set()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for _ in range(KAFKA_TOPIC_CREATE_WAIT_ITERATIONS):
                #topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
                #existing_topics = set(topic_metadata.topics.keys())
                existing_topics = set(kafka_admin_client.list_topics())
                LOGGER.debug('existing_topics={:s}'.format(str(existing_topics)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                missing_topics = desired_topics.difference(existing_topics)
                if len(missing_topics) == 0: break
                MSG = 'Waiting for Topics({:s}) to appear in metadata...'
                LOGGER.debug(MSG.format(str(missing_topics)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                time.sleep(KAFKA_TOPIC_CREATE_WAIT_TIME)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            if len(missing_topics) > 0:
                MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
                LOGGER.error(MSG.format(str(missing_topics)))
                return False
            else:
                LOGGER.debug('All topics created and available.')
                return True
    
    # TODO: create all topics after the deployments (Telemetry and Analytics)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    if __name__ == '__main__':
        import os
        if 'KFK_SERVER_ADDRESS' not in os.environ:
            os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
        KafkaTopic.create_all_topics()