Commit 0e46347c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

files rename to "KafkaProducerService" and "KafkaProducerServiceImpl"

parent 97e5fafc
Loading
Loading
Loading
Loading
+7 −7
Original line number Diff line number Diff line
@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from NodeExporterProducer import KafkaNodeExporterProducer
from KafkaProducerServiceImpl import KafkaProducerServiceImpl

class KafkaProducerController:
class KafkaProducerService:
    """
    Class to control Kafka producer functionality.
    """
@@ -35,16 +35,16 @@ class KafkaProducerController:
            'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics',  # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint
            'kafka_topic'            : 'metric-data',                         # Kafka topic to produce to
            'run_duration'           :  20,                                   # Total duration to execute the producer
            'fetch_interval'         :  3                                     # Time between two fetch requests
            'fetch_interval'         :  4                                     # Time between two fetch requests
        }
        return create_kafka_configuration

    def run_producer(self):
        """
        Method to create KafkaNodeExporterProducer object and start producer thread.
        Method to create KafkaProducerServiceImpl object and start producer thread.
        """
        # Create NodeExporterProducer object and run start_producer_thread
        producer = KafkaNodeExporterProducer(self.bootstrap_servers, self.node_exporter_endpoint, 
        producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint, 
                    self.kafka_topic, self.run_duration, self.fetch_interval
                    )
        # producer.start_producer_thread()    # if threading is required
@@ -52,6 +52,6 @@ class KafkaProducerController:

if __name__ == "__main__":

    # Create Kafka producer controller object and run producer
    kafka_controller = KafkaProducerController()
    # Create Kafka producer service object and run producer
    kafka_controller = KafkaProducerService()
    kafka_controller.run_producer()
+35 −8
Original line number Diff line number Diff line
@@ -19,9 +19,9 @@ import requests
import time
import threading

class KafkaNodeExporterProducer:
class KafkaProducerServiceImpl:
    """
    Class to fetch metrics from Node Exporter and produce them to Kafka.
    Class to fetch metrics from Exporter and produce them to Kafka.
    """

    def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval):
@@ -39,17 +39,25 @@ class KafkaNodeExporterProducer:
        self.run_duration           = run_duration
        self.fetch_interval         = fetch_interval

    def fetch_metrics(self):
    def fetch_node_exporter_metrics(self):
        """
        Method to fetch metrics from Node Exporter.
        Returns:
            str: Metrics fetched from Node Exporter.
        """
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(self.node_exporter_endpoint)
            if response.status_code == 200:
                print(f"Metrics fetched sucessfully...")
                return response.text
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text
                # Check if the desired metric is available in the response
                if KPI in metrics:
                    KPI_VALUE = self.extract_metric_value(metrics, KPI)
                    # Extract the metric value
                    if KPI_VALUE is not None:
                        print(f"KPI value: {KPI_VALUE}")
                        return KPI_VALUE
            else:
                print(f"Failed to fetch metrics. Status code: {response.status_code}")
                return None
@@ -57,6 +65,25 @@ class KafkaNodeExporterProducer:
            print(f"Failed to fetch metrics: {str(e)}")
            return None

    def extract_metric_value(self, metrics, metric_name):
        """
        Method to extract the value of a metric from the metrics string.
        Args:
            metrics (str): Metrics string fetched from Node Exporter.
            metric_name (str): Name of the metric to extract.
        Returns:
            float: Value of the extracted metric, or None if not found.
        """
        try:
            # Find the metric line containing the desired metric name
            metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
            # Split the line to extract the metric value
            metric_value = float(metric_line.split()[1])
            return metric_value
        except StopIteration:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
@@ -101,12 +128,12 @@ class KafkaNodeExporterProducer:
        try:
            start_time = time.time()
            while True:
                metrics = self.fetch_metrics()
                metrics = self.fetch_node_exporter_metrics()  # select the function name based on the provided requirements

                if metrics:
                    kafka_producer.produce(self.kafka_topic, metrics.encode('utf-8'), callback=self.delivery_callback)
                    kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
                    kafka_producer.flush()
                    print("Metrics produced to Kafka topic")
                    # print("Metrics produced to Kafka topic")

                # Check if the specified run duration has elapsed
                if time.time() - start_time >= self.run_duration: