Skip to content
NodeExporterProducer.py 4.68 KiB
Newer Older

# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
import requests
import time
import threading

class KafkaNodeExporterProducer:
    """
    Class to fetch metrics from Node Exporter and produce them to Kafka.
    """

    def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval):
        """
        Constructor to initialize Kafka producer parameters.
        Args:
            bootstrap_servers (str): Kafka broker address.
            node_exporter_endpoint (str): Node Exporter metrics endpoint.
            kafka_topic (str): Kafka topic to produce metrics to.
            run_interval (int): Time interval in seconds to run the producer.
        """
        self.bootstrap_servers = bootstrap_servers
        self.node_exporter_endpoint = node_exporter_endpoint
        self.kafka_topic = kafka_topic
        self.run_duration = run_duration
        self.fetch_interval = fetch_interval

    def fetch_metrics(self):
        """
        Method to fetch metrics from Node Exporter.
        Returns:
            str: Metrics fetched from Node Exporter.
        """
        try:
            response = requests.get(self.node_exporter_endpoint)
            if response.status_code == 200:
                print(f"Metrics fetched sucessfully...")
                return response.text
            else:
                print(f"Failed to fetch metrics. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Failed to fetch metrics: {str(e)}")
            return None

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')

    def create_topic_if_not_exists(self, admin_client):
        """
        Method to create Kafka topic if it does not exist.
        Args:
            admin_client (AdminClient): Kafka admin client.
        """
        try:
            topic_metadata = admin_client.list_topics(timeout=5)
            if self.kafka_topic not in topic_metadata.topics:
                # If the topic does not exist, create a new topic
                print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
                new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
                admin_client.create_topics([new_topic])
        except KafkaException as e:
            print(f"Failed to create topic: {e}")

    def produce_metrics(self):
        """
        Method to continuously produce metrics to Kafka topic.
        """
        conf = {
            'bootstrap.servers': self.bootstrap_servers,
        }

        admin_client = AdminClient(conf)
        self.create_topic_if_not_exists(admin_client)

        kafka_producer = Producer(conf)

        try:
            start_time = time.time()
            while True:
                metrics = self.fetch_metrics()

                if metrics:
                    kafka_producer.produce(self.kafka_topic, metrics.encode('utf-8'), callback=self.delivery_callback)
                    kafka_producer.flush()
                    print("Metrics produced to Kafka topic")

                # Check if the specified run duration has elapsed
                if time.time() - start_time >= self.run_duration:
                    break

                # waiting time until next fetch 
                time.sleep(self.fetch_interval)
        except KeyboardInterrupt:
            print("Keyboard interrupt detected. Exiting...")
        finally:
            kafka_producer.flush()
            # kafka_producer.close()        # this command generates ERROR

    def start_producer_thread(self):
        """
        Method to start the producer thread.
        """
        producer_thread = threading.Thread(target=self.produce_metrics)
        producer_thread.start()