Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Waleed Akbar
committed
import random
import logging
import requests
import threading

Waleed Akbar
committed
from typing import Any, Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
ACTIVE_COLLECTORS = []

Waleed Akbar
committed
class TelemetryBackendService:
"""
Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
"""

Waleed Akbar
committed
LOGGER.info('Init TelemetryBackendService')

Waleed Akbar
committed
def run_kafka_listener(self)->bool: # type: ignore
threading.Thread(target=self.kafka_listener).start()
return True
"""
"""
conusmer_configs = {
'bootstrap.servers' : KAFKA_SERVER_IP,

Waleed Akbar
committed
'group.id' : 'backend',
'auto.offset.reset' : 'latest'
}
topic_request = "topic_request"

Waleed Akbar
committed
if (self.create_topic_if_not_exists([topic_request])):

Waleed Akbar
committed
consumerObj = KafkaConsumer(conusmer_configs)
consumerObj.subscribe([topic_request])
while True:
receive_msg = consumerObj.poll(2.0)
if receive_msg is None:

Waleed Akbar
committed
print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes

Waleed Akbar
committed
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
(kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))

Waleed Akbar
committed
collector_id = receive_msg.key().decode('utf-8')
self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)

Waleed Akbar
committed

Waleed Akbar
committed
def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start()
def initiate_collector_backend(self, collector_id, kpi_id, duration, interval
): # type: ignore
"""
Method to receive collector request attribues and initiates collecter backend.
"""
start_time = time.time()
while True:

Waleed Akbar
committed
ACTIVE_COLLECTORS.append(collector_id)
if time.time() - start_time >= duration: # type: ignore

Waleed Akbar
committed
print("Requested Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time)
break
# print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)

Waleed Akbar
committed
self.extract_kpi_value(collector_id, kpi_id)

Waleed Akbar
committed
# print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)

Waleed Akbar
committed

Waleed Akbar
committed
def extract_kpi_value(self, collector_id: str, kpi_id: str):

Waleed Akbar
committed
"""
Method to extract kpi value.
"""
measured_kpi_value = random.randint(1,100)

Waleed Akbar
committed
self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)

Waleed Akbar
committed

Waleed Akbar
committed
def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):

Waleed Akbar
committed
"""
Method to write response on Kafka topic
"""
producer_configs = {
'bootstrap.servers': KAFKA_SERVER_IP,
}
topic_response = "topic_response"

Waleed Akbar
committed
msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
msg_key = collector_id
producerObj = KafkaProducer(producer_configs)
producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
producerObj.flush()

Waleed Akbar
committed

Waleed Akbar
committed
def create_topic_if_not_exists(self, new_topics: list):

Waleed Akbar
committed
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""

Waleed Akbar
committed
for topic in new_topics:
try:
topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5)
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{topic}' does not exist. Creating...")
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
ADMIN_KAFKA_CLIENT.create_topics([new_topic])
return True
except KafkaException as e:
print(f"Failed to create topic: {e}")
return False
self.verify_required_kafka_topics()

Waleed Akbar
committed
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')

Waleed Akbar
committed
# Function to create a list of topics

Waleed Akbar
committed

Waleed Akbar
committed
# Function to list all topics in the Kafka cluster
def verify_required_kafka_topics(self) -> list:
"""List all topics in the Kafka cluster."""
try:
# Fetch metadata from the broker
metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=10)
topics = list(metadata.topics.keys())
print("Topics in the cluster:", topics)
return topics
except Exception as e:
print(f"Failed to list topics: {e}")
return []
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"

Waleed Akbar
committed
EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"

Waleed Akbar
committed
response = requests.get(EXPORTER_ENDPOINT) # type: ignore
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
def produce_metrics(self):
"""
Method to produce metrics to Kafka topic as per Kafka configs.
"""
conf = {

Waleed Akbar
committed
'bootstrap.servers': KAFKA_SERVER_IP,
}
admin_client = AdminClient(conf)
self.create_topic_if_not_exists(admin_client)
kafka_producer = KafkaProducer(conf)
try:
start_time = time.time()
while True:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:

Waleed Akbar
committed
kafka_producer.produce("topic_raw", str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if time.time() - start_time >= self.run_duration: # type: ignore
break
# waiting time until next fetch
time.sleep(self.fetch_interval) # type: ignore
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting...")
finally:
kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------