# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import ast import time import random import logging import requests import threading from typing import Any, Tuple from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaException from confluent_kafka import KafkaError from confluent_kafka.admin import AdminClient, NewTopic from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') KAFKA_SERVER_IP = '127.0.0.1:9092' # KAFKA_SERVER_IP = '10.152.183.175:30092' ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} class TelemetryBackendService: """ Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic. """ def __init__(self): LOGGER.info('Init TelemetryBackendService') self.running_threads = {} def run_kafka_listener(self)->bool: threading.Thread(target=self.kafka_listener).start() return True def kafka_listener(self): """ listener for requests on Kafka topic. """ conusmer_configs = { 'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'backend', 'auto.offset.reset' : 'latest' } # topic_request = "topic_request" consumerObj = KafkaConsumer(conusmer_configs) # consumerObj.subscribe([topic_request]) consumerObj.subscribe([KAFKA_TOPICS['request']]) while True: receive_msg = consumerObj.poll(2.0) if receive_msg is None: # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(receive_msg.error())) break (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') if duration == -1 and interval == -1: self.terminate_collector_backend(collector_id) # threading.Thread(target=self.terminate_collector_backend, args=(collector_id)) else: self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int): stop_event = threading.Event() thread = threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval, stop_event)) self.running_threads[collector_id] = (thread, stop_event) thread.start() def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event ): # type: ignore """ Method to receive collector request attribues and initiates collecter backend. """ print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): if time.time() - start_time >= duration: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) self.generate_kafka_response(collector_id, "-1", -1) # write to Kafka to send the termination confirmation. break # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) self.extract_kpi_value(collector_id, kpi_id) # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) time.sleep(interval) def extract_kpi_value(self, collector_id: str, kpi_id: str): """ Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) # Should be extracted from exporter/stream # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value) def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any): """ Method to write response on Kafka topic """ # topic_response = "topic_response" msg_value : Tuple [str, Any] = (kpi_id, kpi_value) msg_key = collector_id producerObj = KafkaProducer(PRODUCER_CONFIG) # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback) producerObj.flush() def terminate_collector_backend(self, collector_id): if collector_id in self.running_threads: thread, stop_event = self.running_threads[collector_id] stop_event.set() thread.join() print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] self.generate_kafka_response(collector_id, "-1", -1) def create_topic_if_not_exists(self, new_topics: list) -> bool: """ Method to create Kafka topic if it does not exist. Args: admin_client (AdminClient): Kafka admin client. """ for topic in new_topics: try: topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{topic}' does not exist. Creating...") LOGGER.warning("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) ADMIN_KAFKA_CLIENT.create_topics([new_topic]) except KafkaException as e: print(f"Failed to create topic: {e}") return False return True @staticmethod def delivery_callback( err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- @staticmethod def fetch_single_node_exporter_metric(): """ Method to fetch metrics from Node Exporter. Returns: str: Metrics fetched from Node Exporter. """ KPI = "node_network_receive_packets_total" try: response = requests.get(EXPORTER_ENDPOINT) # type: ignore LOGGER.info("Request status {:}".format(response)) if response.status_code == 200: # print(f"Metrics fetched sucessfully...") metrics = response.text # Check if the desired metric is available in the response if KPI in metrics: KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI) # Extract the metric value if KPI_VALUE is not None: LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE)) print(f"Extracted value of {KPI} is: {KPI_VALUE}") return KPI_VALUE else: LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code)) # print(f"Failed to fetch metrics. Status code: {response.status_code}") return None except Exception as e: LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) # print(f"Failed to fetch metrics: {str(e)}") return None @staticmethod def extract_metric_value(metrics, metric_name): """ Method to extract the value of a metric from the metrics string. Args: metrics (str): Metrics string fetched from Exporter. metric_name (str): Name of the metric to extract. Returns: float: Value of the extracted metric, or None if not found. """ try: # Find the metric line containing the desired metric name metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) # Split the line to extract the metric value metric_value = float(metric_line.split()[1]) return metric_value except StopIteration: print(f"Metric '{metric_name}' not found in the metrics.") return None @staticmethod def stream_node_export_metrics_to_raw_topic(): try: while True: response = requests.get(EXPORTER_ENDPOINT) # print("Response Status {:} ".format(response)) # LOGGER.info("Response Status {:} ".format(response)) try: if response.status_code == 200: producerObj = KafkaProducer(PRODUCER_CONFIG) producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback) producerObj.flush() LOGGER.info("Produce to topic") else: LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code)) print(f"Didn't received expected response. Status code: {response.status_code}") return None time.sleep(15) except Exception as e: LOGGER.info("Failed to process response. Status code: {:}".format(e)) return None except Exception as e: LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e)) print(f"Failed to fetch metrics: {str(e)}") return None # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------