Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import requests
import threading
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
class TelemetryBackendService:
"""
Class to fetch metrics from Exporter and produce them to Kafka.
"""

Waleed Akbar
committed
def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None,
kafka_topic=None, run_duration=None, fetch_interval=None):
"""
Constructor to initialize Kafka producer parameters.
Args:
bootstrap_servers (str): Kafka broker address.
exporter_endpoint (str): Node Exporter metrics endpoint.
kafka_topic (str): Kafka topic to produce metrics to.
run_interval (int): Time interval in seconds to run the producer.
"""
LOGGER.info('Init TelemetryBackendService')

Waleed Akbar
committed
self.bootstrap_servers = bootstrap_servers
self.exporter_endpoint = exporter_endpoint
self.kafka_topic = kafka_topic
self.run_duration = run_duration
self.fetch_interval = fetch_interval
def receive_kafka_request(self,
): # type: ignore
"""
Method to receive collector request on Kafka topic.
"""
conusmer_configs = {
'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'consumer',
'auto.offset.reset' : 'earliest'
}
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
topic_request = "topic_request"
consumerObj = KafkaConsumer(conusmer_configs)
consumerObj.subscribe([topic_request])
start_time = time.time()
while True:
receive_msg = consumerObj.poll(1.0)
if receive_msg is None:
print ("nothing to read ...", time.time() - start_time)
if time.time() - start_time >= 10: # type: ignore
print("Timeout: consumer terminated")
break
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
print ("Received Message: ", receive_msg.value().decode('utf-8'))
def execute_receive_kafka_request(self
)->Empty: # type: ignore
threading.Thread(target=self.receive_kafka_request).start()
return True
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
response = Tuple[str, str]
collector_id = str('test collector Id')
collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message
response = (collector_id, collected_Value)
return response
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
(collector_id, collector_value) = request
response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})
# _collector_id, _collector_id_value = request
# write collector_id and collector_id value on the Kafka topic
# get kafka bootstrap server and topic name
# write to kafka topic
return response
def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore
# flush and close kafka producer object
return Empty()
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:
response = requests.get(self.exporter_endpoint) # type: ignore
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
def create_topic_if_not_exists(self, admin_client):
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""
try:
topic_metadata = admin_client.list_topics(timeout=5)
if self.kafka_topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
def produce_metrics(self):
"""
Method to produce metrics to Kafka topic as per Kafka configs.
"""
conf = {
'bootstrap.servers': self.bootstrap_servers,
}
admin_client = AdminClient(conf)
self.create_topic_if_not_exists(admin_client)
kafka_producer = KafkaProducer(conf)
try:
start_time = time.time()
while True:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if time.time() - start_time >= self.run_duration: # type: ignore
break
# waiting time until next fetch
time.sleep(self.fetch_interval) # type: ignore
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting...")
finally:
kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------