Skip to content
Snippets Groups Projects
TelemetryBackendService.py 9.09 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import logging
import requests
import threading
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method

LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP = '127.0.0.1:9092'

    Class to fetch metrics from Exporter and produce them to Kafka.
    def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None,
                  kafka_topic=None, run_duration=None, fetch_interval=None):
        """
        Constructor to initialize Kafka producer parameters.
        Args:
            bootstrap_servers (str): Kafka broker address.
            exporter_endpoint (str): Node Exporter metrics endpoint.
            kafka_topic (str): Kafka topic to produce metrics to.
            run_interval (int): Time interval in seconds to run the producer.
        """
        LOGGER.info('Init TelemetryBackendService')
        self.bootstrap_servers = bootstrap_servers
        self.exporter_endpoint = exporter_endpoint
        self.kafka_topic       = kafka_topic
        self.run_duration      = run_duration
        self.fetch_interval    = fetch_interval
    
    def receive_kafka_request(self,
                        ): # type: ignore
        Method to receive collector request on Kafka topic.
        conusmer_configs = {
            'bootstrap.servers' : KAFKA_SERVER_IP,
            'group.id'          : 'consumer',
            'auto.offset.reset' : 'earliest'
        topic_request = "topic_request"

        consumerObj = KafkaConsumer(conusmer_configs)
        consumerObj.subscribe([topic_request])

        start_time = time.time()
        while True:
            receive_msg = consumerObj.poll(1.0)
            if receive_msg is None:
                print ("nothing to read ...", time.time() - start_time)
                if time.time() - start_time >= 10: # type: ignore
                    print("Timeout: consumer terminated")
                    break
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
            print ("Received Message: ", receive_msg.value().decode('utf-8'))

    def execute_receive_kafka_request(self
                                      )->Empty: # type: ignore
        threading.Thread(target=self.receive_kafka_request).start()
        return True

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
        response        =  Tuple[str, str]
        collector_id    = str('test collector Id')
        collected_Value = str('test collected value')        # Metric to be fetched from endpoint based on Collector message
        response        = (collector_id, collected_Value)      
        return response

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
        (collector_id, collector_value) = request
        response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})
        # _collector_id, _collector_id_value = request
        # write collector_id and collector_id value on the Kafka topic

        # get kafka bootstrap server and topic name
        # write to kafka topic
        
        return response

    def stop_producer(self, request: KafkaProducer) -> Empty:  # type: ignore
        # flush and close kafka producer object
        return Empty()

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------

    def fetch_node_exporter_metrics(self):
        """
        Method to fetch metrics from Node Exporter.
        Returns:
            str: Metrics fetched from Node Exporter.
        """
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(self.exporter_endpoint) # type: ignore
            if response.status_code == 200:
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text
                # Check if the desired metric is available in the response
                if KPI in metrics:
                    KPI_VALUE = self.extract_metric_value(metrics, KPI)
                    # Extract the metric value
                    if KPI_VALUE is not None:
                        print(f"KPI value: {KPI_VALUE}")
                        return KPI_VALUE
            else:
                print(f"Failed to fetch metrics. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Failed to fetch metrics: {str(e)}")
            return None

    def extract_metric_value(self, metrics, metric_name):
        """
        Method to extract the value of a metric from the metrics string.
        Args:
            metrics (str): Metrics string fetched from Node Exporter.
            metric_name (str): Name of the metric to extract.
        Returns:
            float: Value of the extracted metric, or None if not found.
        """
        try:
            # Find the metric line containing the desired metric name
            metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
            # Split the line to extract the metric value
            metric_value = float(metric_line.split()[1])
            return metric_value
        except StopIteration:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    def delivery_callback(self, err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')

    def create_topic_if_not_exists(self, admin_client):
        """
        Method to create Kafka topic if it does not exist.
        Args:
            admin_client (AdminClient): Kafka admin client.
        """
        try:
            topic_metadata = admin_client.list_topics(timeout=5)
            if self.kafka_topic not in topic_metadata.topics:
                # If the topic does not exist, create a new topic
                print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
                new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
                admin_client.create_topics([new_topic])
        except KafkaException as e:
            print(f"Failed to create topic: {e}")

    def produce_metrics(self):
        """
        Method to produce metrics to Kafka topic as per Kafka configs.
        """
        conf = {
            'bootstrap.servers': self.bootstrap_servers,
        }

        admin_client = AdminClient(conf)
        self.create_topic_if_not_exists(admin_client)

        kafka_producer = KafkaProducer(conf)

        try:
            start_time = time.time()
            while True:
                metrics = self.fetch_node_exporter_metrics()  # select the function name based on the provided requirements

                if metrics:
                    kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
                    kafka_producer.flush()
                    # print("Metrics produced to Kafka topic")
                # Check if the specified run duration has elapsed
                if time.time() - start_time >= self.run_duration: # type: ignore
                    break
                # waiting time until next fetch 
                time.sleep(self.fetch_interval) # type: ignore
        except KeyboardInterrupt:
            print("Keyboard interrupt detected. Exiting...")
        finally:
            kafka_producer.flush()
            # kafka_producer.close()        # this command generates ERROR
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------