# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic import requests import time import threading class KafkaNodeExporterProducer: """ Class to fetch metrics from Node Exporter and produce them to Kafka. """ def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval): """ Constructor to initialize Kafka producer parameters. Args: bootstrap_servers (str): Kafka broker address. node_exporter_endpoint (str): Node Exporter metrics endpoint. kafka_topic (str): Kafka topic to produce metrics to. run_interval (int): Time interval in seconds to run the producer. """ self.bootstrap_servers = bootstrap_servers self.node_exporter_endpoint = node_exporter_endpoint self.kafka_topic = kafka_topic self.run_duration = run_duration self.fetch_interval = fetch_interval def fetch_metrics(self): """ Method to fetch metrics from Node Exporter. Returns: str: Metrics fetched from Node Exporter. """ try: response = requests.get(self.node_exporter_endpoint) if response.status_code == 200: print(f"Metrics fetched sucessfully...") return response.text else: print(f"Failed to fetch metrics. Status code: {response.status_code}") return None except Exception as e: print(f"Failed to fetch metrics: {str(e)}") return None def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') def create_topic_if_not_exists(self, admin_client): """ Method to create Kafka topic if it does not exist. Args: admin_client (AdminClient): Kafka admin client. """ try: topic_metadata = admin_client.list_topics(timeout=5) if self.kafka_topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{self.kafka_topic}' does not exist. Creating...") new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) admin_client.create_topics([new_topic]) except KafkaException as e: print(f"Failed to create topic: {e}") def produce_metrics(self): """ Method to continuously produce metrics to Kafka topic. """ conf = { 'bootstrap.servers': self.bootstrap_servers, } admin_client = AdminClient(conf) self.create_topic_if_not_exists(admin_client) kafka_producer = Producer(conf) try: start_time = time.time() while True: metrics = self.fetch_metrics() if metrics: kafka_producer.produce(self.kafka_topic, metrics.encode('utf-8'), callback=self.delivery_callback) kafka_producer.flush() print("Metrics produced to Kafka topic") # Check if the specified run duration has elapsed if time.time() - start_time >= self.run_duration: break # waiting time until next fetch time.sleep(self.fetch_interval) except KeyboardInterrupt: print("Keyboard interrupt detected. Exiting...") finally: kafka_producer.flush() # kafka_producer.close() # this command generates ERROR def start_producer_thread(self): """ Method to start the producer thread. """ producer_thread = threading.Thread(target=self.produce_metrics) producer_thread.start()