Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import logging
import requests
import threading
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
class TelemetryBackendService:
"""
Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
"""

Waleed Akbar
committed
LOGGER.info('Init TelemetryBackendService')
"""
"""
conusmer_configs = {
'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'consumer',
'auto.offset.reset' : 'earliest'
}
topic_request = "topic_request"
consumerObj = KafkaConsumer(conusmer_configs)
consumerObj.subscribe([topic_request])
while True:
if receive_msg is None:
print ("Telemetry backend listener is active: Kafka Topic: ", topic_request) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
(kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
self.execute_process_kafka_request(kpi_id, duration, interval)
def run_kafka_listener(self)->Empty: # type: ignore
threading.Thread(target=self.kafka_listener).start()
return True
def process_kafka_request(self, kpi_id, duration, interval
): # type: ignore
"""
Method to receive collector request attribues and initiates collecter backend.
"""
start_time = time.time()
while True:
if time.time() - start_time >= duration: # type: ignore
print("Timeout: consumer terminated", time.time() - start_time)
break
# print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
time.sleep(interval)
def execute_process_kafka_request(self, kpi_id: str, duration: int, interval: int):
threading.Thread(target=self.process_kafka_request, args=(kpi_id, duration, interval)).start()
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI = "node_network_receive_packets_total"
try:
response = requests.get(self.exporter_endpoint) # type: ignore
if response.status_code == 200:
# print(f"Metrics fetched sucessfully...")
metrics = response.text
# Check if the desired metric is available in the response
if KPI in metrics:
KPI_VALUE = self.extract_metric_value(metrics, KPI)
# Extract the metric value
if KPI_VALUE is not None:
print(f"KPI value: {KPI_VALUE}")
return KPI_VALUE
else:
print(f"Failed to fetch metrics. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Failed to fetch metrics: {str(e)}")
return None
def extract_metric_value(self, metrics, metric_name):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try:
# Find the metric line containing the desired metric name
metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# Split the line to extract the metric value
metric_value = float(metric_line.split()[1])
return metric_value
except StopIteration:
print(f"Metric '{metric_name}' not found in the metrics.")
return None
def delivery_callback(self, err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')
def create_topic_if_not_exists(self, admin_client):
"""
Method to create Kafka topic if it does not exist.
Args:
admin_client (AdminClient): Kafka admin client.
"""
try:
topic_metadata = admin_client.list_topics(timeout=5)
if self.kafka_topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
except KafkaException as e:
print(f"Failed to create topic: {e}")
def produce_metrics(self):
"""
Method to produce metrics to Kafka topic as per Kafka configs.
"""
conf = {
'bootstrap.servers': self.bootstrap_servers,
}
admin_client = AdminClient(conf)
self.create_topic_if_not_exists(admin_client)
kafka_producer = KafkaProducer(conf)
try:
start_time = time.time()
while True:
metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements
if metrics:
kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
kafka_producer.flush()
# print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if time.time() - start_time >= self.run_duration: # type: ignore
break
# waiting time until next fetch
time.sleep(self.fetch_interval) # type: ignore
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting...")
finally:
kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------