Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Waleed Akbar
committed
import random
import logging
import requests
import threading

Waleed Akbar
committed
from typing import Any, Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS = {'request' : 'topic_request',
'response': 'topic_response'}
EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"

Waleed Akbar
committed
class TelemetryBackendService:
"""
Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
"""

Waleed Akbar
committed
LOGGER.info('Init TelemetryBackendService')
self.running_threads = {}
def run_kafka_listener(self)->bool:

Waleed Akbar
committed
threading.Thread(target=self.kafka_listener).start()
return True
"""
"""
conusmer_configs = {
'bootstrap.servers' : KAFKA_SERVER_IP,

Waleed Akbar
committed
'group.id' : 'backend',
'auto.offset.reset' : 'latest'
}
# topic_request = "topic_request"
consumerObj = KafkaConsumer(conusmer_configs)
# consumerObj.subscribe([topic_request])
consumerObj.subscribe([KAFKA_TOPICS['request']])

Waleed Akbar
committed
while True:
receive_msg = consumerObj.poll(2.0)
if receive_msg is None:
# print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes
# print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
(kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
collector_id = receive_msg.key().decode('utf-8')
if duration == -1 and interval == -1:
self.terminate_collector_backend(collector_id)
# threading.Thread(target=self.terminate_collector_backend, args=(collector_id))
else:
self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)

Waleed Akbar
committed
def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
stop_event = threading.Event()
thread = threading.Thread(target=self.initiate_collector_backend,
args=(collector_id, kpi_id, duration, interval, stop_event))
self.running_threads[collector_id] = (thread, stop_event)
thread.start()

Waleed Akbar
committed
def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event
): # type: ignore
"""
Method to receive collector request attribues and initiates collecter backend.
"""
print("Initiating backend for collector: ", collector_id)
while not stop_event.is_set():
if time.time() - start_time >= duration: # condition to terminate backend
print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
self.generate_kafka_response(collector_id, "-1", -1)
# write to Kafka to send the termination confirmation.
break
# print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)

Waleed Akbar
committed
self.extract_kpi_value(collector_id, kpi_id)

Waleed Akbar
committed
# print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)

Waleed Akbar
committed

Waleed Akbar
committed
def extract_kpi_value(self, collector_id: str, kpi_id: str):

Waleed Akbar
committed
"""
Method to extract kpi value.
"""
measured_kpi_value = random.randint(1,100) # Should be extracted from exporter/stream
# measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI

Waleed Akbar
committed
self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)

Waleed Akbar
committed

Waleed Akbar
committed
def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):

Waleed Akbar
committed
"""
Method to write response on Kafka topic
"""
producer_configs = {
'bootstrap.servers': KAFKA_SERVER_IP,
}
# topic_response = "topic_response"

Waleed Akbar
committed
msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
msg_key = collector_id
producerObj = KafkaProducer(producer_configs)
# producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)

Waleed Akbar
committed
producerObj.flush()

Waleed Akbar
committed
def terminate_collector_backend(self, collector_id):
if collector_id in self.running_threads:
thread, stop_event = self.running_threads[collector_id]
stop_event.set()
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.generate_kafka_response(collector_id, "-1", -1)
def create_topic_if_not_exists(self, new_topics: list) -> bool:

Waleed Akbar
committed
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""

Waleed Akbar
committed
for topic in new_topics:
try:
topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5)
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{topic}' does not exist. Creating...")
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
ADMIN_KAFKA_CLIENT.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
return False
return True

Waleed Akbar
committed
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:

Waleed Akbar
committed
response = requests.get(EXPORTER_ENDPOINT) # type: ignore
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------