# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import logging import requests import threading from typing import Tuple from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaException from confluent_kafka import KafkaError from confluent_kafka.admin import AdminClient, NewTopic from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryBackendService: """ Class to fetch metrics from Exporter and produce them to Kafka. """ def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None, kafka_topic=None, run_duration=None, fetch_interval=None): """ Constructor to initialize Kafka producer parameters. Args: bootstrap_servers (str): Kafka broker address. exporter_endpoint (str): Node Exporter metrics endpoint. kafka_topic (str): Kafka topic to produce metrics to. run_interval (int): Time interval in seconds to run the producer. """ LOGGER.info('Init TelemetryBackendService') self.bootstrap_servers = bootstrap_servers self.exporter_endpoint = exporter_endpoint self.kafka_topic = kafka_topic self.run_duration = run_duration self.fetch_interval = fetch_interval def receive_kafka_request(self, ): # type: ignore """ Method to receive collector request on Kafka topic. """ conusmer_configs = { 'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'consumer', 'auto.offset.reset' : 'earliest' } topic_request = "topic_request" consumerObj = KafkaConsumer(conusmer_configs) consumerObj.subscribe([topic_request]) start_time = time.time() while True: receive_msg = consumerObj.poll(1.0) if receive_msg is None: print ("nothing to read ...", time.time() - start_time) if time.time() - start_time >= 10: # type: ignore print("Timeout: consumer terminated") break continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(receive_msg.error())) break print ("Received Message: ", receive_msg.value().decode('utf-8')) def execute_receive_kafka_request(self )->Empty: # type: ignore threading.Thread(target=self.receive_kafka_request).start() return True # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore response = Tuple[str, str] collector_id = str('test collector Id') collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message response = (collector_id, collected_Value) return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer: (collector_id, collector_value) = request response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers}) # _collector_id, _collector_id_value = request # write collector_id and collector_id value on the Kafka topic # get kafka bootstrap server and topic name # write to kafka topic return response def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore # flush and close kafka producer object return Empty() # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- def fetch_node_exporter_metrics(self): """ Method to fetch metrics from Node Exporter. Returns: str: Metrics fetched from Node Exporter. """ KPI = "node_network_receive_packets_total" try: response = requests.get(self.exporter_endpoint) # type: ignore if response.status_code == 200: # print(f"Metrics fetched sucessfully...") metrics = response.text # Check if the desired metric is available in the response if KPI in metrics: KPI_VALUE = self.extract_metric_value(metrics, KPI) # Extract the metric value if KPI_VALUE is not None: print(f"KPI value: {KPI_VALUE}") return KPI_VALUE else: print(f"Failed to fetch metrics. Status code: {response.status_code}") return None except Exception as e: print(f"Failed to fetch metrics: {str(e)}") return None def extract_metric_value(self, metrics, metric_name): """ Method to extract the value of a metric from the metrics string. Args: metrics (str): Metrics string fetched from Node Exporter. metric_name (str): Name of the metric to extract. Returns: float: Value of the extracted metric, or None if not found. """ try: # Find the metric line containing the desired metric name metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) # Split the line to extract the metric value metric_value = float(metric_line.split()[1]) return metric_value except StopIteration: print(f"Metric '{metric_name}' not found in the metrics.") return None def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') def create_topic_if_not_exists(self, admin_client): """ Method to create Kafka topic if it does not exist. Args: admin_client (AdminClient): Kafka admin client. """ try: topic_metadata = admin_client.list_topics(timeout=5) if self.kafka_topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{self.kafka_topic}' does not exist. Creating...") new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) admin_client.create_topics([new_topic]) except KafkaException as e: print(f"Failed to create topic: {e}") def produce_metrics(self): """ Method to produce metrics to Kafka topic as per Kafka configs. """ conf = { 'bootstrap.servers': self.bootstrap_servers, } admin_client = AdminClient(conf) self.create_topic_if_not_exists(admin_client) kafka_producer = KafkaProducer(conf) try: start_time = time.time() while True: metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements if metrics: kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) kafka_producer.flush() # print("Metrics produced to Kafka topic") # Check if the specified run duration has elapsed if time.time() - start_time >= self.run_duration: # type: ignore break # waiting time until next fetch time.sleep(self.fetch_interval) # type: ignore except KeyboardInterrupt: print("Keyboard interrupt detected. Exiting...") finally: kafka_producer.flush() # kafka_producer.close() # this command generates ERROR # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------