Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import requests

Waleed Akbar
committed
from typing import Tuple
from common.proto.context_pb2 import Empty

Waleed Akbar
committed
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')

Waleed Akbar
committed
ACTIVE_KAFKA_PRODUCERS = [] # list of active kafka producers

Waleed Akbar
committed
class TelemetryBackendServiceImpl:
"""
Class to fetch metrics from Exporter and produce them to Kafka.
"""

Waleed Akbar
committed
def __init__(self, bootstrap_servers=None, exporter_endpoint=None,
kafka_topic=None, run_duration=None, fetch_interval=None):
"""
Constructor to initialize Kafka producer parameters.
Args:
bootstrap_servers (str): Kafka broker address.

Waleed Akbar
committed
exporter_endpoint (str): Node Exporter metrics endpoint.
kafka_topic (str): Kafka topic to produce metrics to.
run_interval (int): Time interval in seconds to run the producer.
"""
LOGGER.info('Init TelemetryBackendService')

Waleed Akbar
committed
self.bootstrap_servers = bootstrap_servers
self.exporter_endpoint = exporter_endpoint
self.kafka_topic = kafka_topic
self.run_duration = run_duration
self.fetch_interval = fetch_interval
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)

Waleed Akbar
committed
def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
response = Tuple[str, str]
collector_id = str('test collector Id')
collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message
response = (collector_id, collected_Value)
return response
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)

Waleed Akbar
committed
def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
(collector_id, collector_value) = request
response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})

Waleed Akbar
committed
# _collector_id, _collector_id_value = request
# write collector_id and collector_id value on the Kafka topic

Waleed Akbar
committed
# get kafka bootstrap server and topic name
# write to kafka topic

Waleed Akbar
committed
return response
def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore
# flush and close kafka producer object
return Empty()
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:

Waleed Akbar
committed
response = requests.get(self.exporter_endpoint) # type: ignore
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
def create_topic_if_not_exists(self, admin_client):
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""
try:
topic_metadata = admin_client.list_topics(timeout=5)
if self.kafka_topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
def produce_metrics(self):
"""
Method to produce metrics to Kafka topic as per Kafka configs.
"""
conf = {
'bootstrap.servers': self.bootstrap_servers,
}
admin_client = AdminClient(conf)
self.create_topic_if_not_exists(admin_client)

Waleed Akbar
committed
kafka_producer = KafkaProducer(conf)
try:
start_time = time.time()
while True:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed

Waleed Akbar
committed
if time.time() - start_time >= self.run_duration: # type: ignore
break
# waiting time until next fetch

Waleed Akbar
committed
time.sleep(self.fetch_interval) # type: ignore
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting...")
finally:
kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------