Commits (2)
......@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from NodeExporterProducer import KafkaNodeExporterProducer
from KafkaProducerServiceImpl import KafkaProducerServiceImpl
class KafkaProducerController:
class KafkaProducerService:
"""
Class to control Kafka producer functionality.
"""
......@@ -35,16 +35,16 @@ class KafkaProducerController:
'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint
'kafka_topic' : 'metric-data', # Kafka topic to produce to
'run_duration' : 20, # Total duration to execute the producer
'fetch_interval' : 3 # Time between two fetch requests
'fetch_interval' : 4 # Time between two fetch requests
}
return create_kafka_configuration
def run_producer(self):
"""
Method to create KafkaNodeExporterProducer object and start producer thread.
Method to create KafkaProducerServiceImpl object and start producer thread.
"""
# Create NodeExporterProducer object and run start_producer_thread
producer = KafkaNodeExporterProducer(self.bootstrap_servers, self.node_exporter_endpoint,
producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint,
self.kafka_topic, self.run_duration, self.fetch_interval
)
# producer.start_producer_thread() # if threading is required
......@@ -52,6 +52,6 @@ class KafkaProducerController:
if __name__ == "__main__":
# Create Kafka producer controller object and run producer
kafka_controller = KafkaProducerController()
# Create Kafka producer service object and run producer
kafka_controller = KafkaProducerService()
kafka_controller.run_producer()
......@@ -19,9 +19,9 @@ import requests
import time
import threading
class KafkaNodeExporterProducer:
class KafkaProducerServiceImpl:
"""
Class to fetch metrics from Node Exporter and produce them to Kafka.
Class to fetch metrics from Exporter and produce them to Kafka.
"""
def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval):
......@@ -39,17 +39,25 @@ class KafkaNodeExporterProducer:
self.run_duration = run_duration
self.fetch_interval = fetch_interval
def fetch_metrics(self):
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:
response = requests.get(self.node_exporter_endpoint)
if response.status_code == 200:
print(f"Metrics fetched sucessfully...")
return response.text
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
......@@ -57,6 +65,25 @@ class KafkaNodeExporterProducer:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
......@@ -101,12 +128,12 @@ class KafkaNodeExporterProducer:
try:
start_time = time.time()
while True:
metrics = self.fetch_metrics()
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:
kafka_producer.produce(self.kafka_topic, metrics.encode('utf-8'), callback=self.delivery_callback)
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
print("Metrics produced to Kafka topic")
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if time.time() - start_time >= self.run_duration:
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService
kafka_controller = KafkaProducerService()
kafka_controller.run_producer()
\ No newline at end of file