Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import requests
import threading

Waleed Akbar
committed
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
class KafkaProducerServiceImpl:
"""
Class to fetch metrics from Exporter and produce them to Kafka.
"""

Waleed Akbar
committed
def __init__(self, bootstrap_servers=None, exporter_endpoint=None,
kafka_topic=None, run_duration=None, fetch_interval=None):
"""
Constructor to initialize Kafka producer parameters.
Args:
bootstrap_servers (str): Kafka broker address.

Waleed Akbar
committed
exporter_endpoint (str): Node Exporter metrics endpoint.
kafka_topic (str): Kafka topic to produce metrics to.
run_interval (int): Time interval in seconds to run the producer.
"""
LOGGER.info('Init TelemetryBackendService')

Waleed Akbar
committed
self.bootstrap_servers = bootstrap_servers
self.exporter_endpoint = exporter_endpoint
self.kafka_topic = kafka_topic
self.run_duration = run_duration
self.fetch_interval = fetch_interval
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)

Waleed Akbar
committed
def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
response = Tuple[str, str]
response = ('test collector Id', 'test collected value') # Metric to be fetched from endpoint based on Collector message
return response
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)

Waleed Akbar
committed
def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
# _collector_id, _collector_id_value = request
# write collector_id and collector_id value on the Kafka topic
return Empty()
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:

Waleed Akbar
committed
response = requests.get(self.exporter_endpoint)
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
def create_topic_if_not_exists(self, admin_client):
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""
try:
topic_metadata = admin_client.list_topics(timeout=5)
if self.kafka_topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
def produce_metrics(self):
"""
Method to produce metrics to Kafka topic as per Kafka configs.
"""
conf = {
'bootstrap.servers': self.bootstrap_servers,
}
admin_client = AdminClient(conf)
self.create_topic_if_not_exists(admin_client)
kafka_producer = Producer(conf)
try:
start_time = time.time()
while True:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if time.time() - start_time >= self.run_duration:
break
# waiting time until next fetch
time.sleep(self.fetch_interval)
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting...")
finally:
kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR
def start_producer_thread(self):
"""
Method to start the producer thread.
"""
producer_thread = threading.Thread(target=self.produce_metrics)
producer_thread.start()
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------