Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Waleed Akbar
committed
import random
import logging
import threading
# from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.method_wrappers.Decorator import MetricsPool
from common.tools.service.GenericGrpcService import GenericGrpcService
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
# EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics"
"""
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both RESPONSE and VALUE kafka topics.
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
self.running_threads = {}
def install_servicers(self):
threading.Thread(target=self.RequestListener).start()
def RequestListener(self):
"""
"""
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.REQUEST.value])
while True:
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
collector = json.loads(receive_msg.value().decode('utf-8'))
collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if collector['duration'] == -1 and collector['interval'] == -1:
self.TerminateCollectorBackend(collector_id)
self.RunInitiateCollectorBackend(collector_id, collector)
def TerminateCollectorBackend(self, collector_id):
if collector_id in self.running_threads:
thread, stop_event = self.running_threads[collector_id]
stop_event.set()
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
else:
print ('Backend collector {:} not found'.format(collector_id))

Waleed Akbar
committed
def RunInitiateCollectorBackend(self, collector_id: str, collector: str):
stop_event = threading.Event()
thread = threading.Thread(target=self.InitiateCollectorBackend,
args=(collector_id, collector, stop_event))
self.running_threads[collector_id] = (thread, stop_event)
thread.start()

Waleed Akbar
committed
def InitiateCollectorBackend(self, collector_id, collector, stop_event):
Method receives collector request and initiates collecter backend.
print("Initiating backend for collector: ", collector_id)
while not stop_event.is_set():
if time.time() - start_time >= collector['duration']: # condition to terminate backend
print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
self.GenerateCollectorResponse(collector_id, "-1", -1) # Termination confirmation to frontend.
self.ExtractKpiValue(collector_id, collector['kpi_id'])
time.sleep(collector['interval'])

Waleed Akbar
committed
def ExtractKpiValue(self, collector_id: str, kpi_id: str):

Waleed Akbar
committed
"""
Method to extract kpi value.
"""
measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device
print ("Measured Kpi value: {:}".format(measured_kpi_value))
# measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI
self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value)

Waleed Akbar
committed
def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):

Waleed Akbar
committed
"""
Method to write kpi value on RESPONSE Kafka topic

Waleed Akbar
committed
"""
producer = self.kafka_producer
kpi_value : Dict = {
"kpi_id" : kpi_id,
"kpi_value" : measured_kpi_value
}
producer.produce(
KafkaTopic.RESPONSE.value,
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
)
producer.flush()

Waleed Akbar
committed
def GenerateRawMetric(self, metrics: Any):
"""
Method writes raw metrics on VALUE Kafka topic
"""
producer = self.kafka_producer
some_metric : Dict = {
"some_id" : metrics
}
producer.produce(
KafkaTopic.VALUE.value,
key = 'raw',
value = json.dumps(some_metric),
callback = self.delivery_callback
)
producer.flush()

Waleed Akbar
committed
"""
Callback function to handle message delivery status.
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
Args: err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err: print(f'Message delivery failed: {err}')
# else: print(f'Message delivered to topic {msg.topic()}')
# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
# @staticmethod
# def fetch_single_node_exporter_metric():
# """
# Method to fetch metrics from Node Exporter.
# Returns:
# str: Metrics fetched from Node Exporter.
# """
# KPI = "node_network_receive_packets_total"
# try:
# response = requests.get(EXPORTER_ENDPOINT) # type: ignore
# LOGGER.info("Request status {:}".format(response))
# if response.status_code == 200:
# # print(f"Metrics fetched sucessfully...")
# metrics = response.text
# # Check if the desired metric is available in the response
# if KPI in metrics:
# KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
# # Extract the metric value
# if KPI_VALUE is not None:
# LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
# print(f"Extracted value of {KPI} is: {KPI_VALUE}")
# return KPI_VALUE
# else:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
# # print(f"Failed to fetch metrics. Status code: {response.status_code}")
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# # print(f"Failed to fetch metrics: {str(e)}")
# return None
# @staticmethod
# def extract_metric_value(metrics, metric_name):
# """
# Method to extract the value of a metric from the metrics string.
# Args:
# metrics (str): Metrics string fetched from Exporter.
# metric_name (str): Name of the metric to extract.
# Returns:
# float: Value of the extracted metric, or None if not found.
# """
# try:
# # Find the metric line containing the desired metric name
# metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
# # Split the line to extract the metric value
# metric_value = float(metric_line.split()[1])
# return metric_value
# except StopIteration:
# print(f"Metric '{metric_name}' not found in the metrics.")
# return None
# @staticmethod
# def stream_node_export_metrics_to_raw_topic():
# try:
# while True:
# response = requests.get(EXPORTER_ENDPOINT)
# # print("Response Status {:} ".format(response))
# # LOGGER.info("Response Status {:} ".format(response))
# try:
# if response.status_code == 200:
# producerObj = KafkaProducer(PRODUCER_CONFIG)
# producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
# producerObj.flush()
# LOGGER.info("Produce to topic")
# else:
# LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
# print(f"Didn't received expected response. Status code: {response.status_code}")
# return None
# time.sleep(15)
# except Exception as e:
# LOGGER.info("Failed to process response. Status code: {:}".format(e))
# return None
# except Exception as e:
# LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
# print(f"Failed to fetch metrics: {str(e)}")
# return None
# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------