Commit 3b8e35ee authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Modification made for tests (fetch_node_exporter_metrics and...

Modification made for tests (fetch_node_exporter_metrics and stream_node_export_metrics_to_raw_topic)
parent 2f9bdc38
Loading
Loading
Loading
Loading
+45 −16
Original line number Diff line number Diff line
@@ -33,9 +33,11 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL       = MetricsPool('Telemetry', 'TelemetryBackend')
KAFKA_SERVER_IP    = '127.0.0.1:9092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 
                      'response': 'topic_response'}
EXPORTER_ENDPOINT  = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics"
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labled'}
EXPORTER_ENDPOINT  = "http://127.0.0.1:9100/metrics"
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}


class TelemetryBackendService:
    """
@@ -122,15 +124,12 @@ class TelemetryBackendService:
        """
        Method to write response on Kafka topic
        """
        producer_configs = {
            'bootstrap.servers': KAFKA_SERVER_IP,
        }
        # topic_response = "topic_response"
        msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
        msg_key    = collector_id
        producerObj = KafkaProducer(producer_configs)
        producerObj = KafkaProducer(PRODUCER_CONFIG)
        # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback)
        producerObj.flush()

    def terminate_collector_backend(self, collector_id):
@@ -161,7 +160,8 @@ class TelemetryBackendService:
                return False
        return True

    def delivery_callback(self, err, msg):
    @staticmethod
    def delivery_callback( err, msg):
        """
        Callback function to handle message delivery status.
        Args:
@@ -174,8 +174,8 @@ class TelemetryBackendService:
            print(f'Message delivered to topic {msg.topic()}')

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------

    def fetch_node_exporter_metrics(self):
    @staticmethod
    def fetch_single_node_exporter_metric():
        """
        Method to fetch metrics from Node Exporter.
        Returns:
@@ -184,24 +184,29 @@ class TelemetryBackendService:
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(EXPORTER_ENDPOINT) # type: ignore
            LOGGER.info("Request status {:}".format(response))
            if response.status_code == 200:
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text
                # Check if the desired metric is available in the response
                if KPI in metrics:
                    KPI_VALUE = self.extract_metric_value(metrics, KPI)
                    KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
                    # Extract the metric value
                    if KPI_VALUE is not None:
                        print(f"KPI value: {KPI_VALUE}")
                        LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
                        print(f"Extracted value of {KPI} is: {KPI_VALUE}")
                        return KPI_VALUE
            else:
                print(f"Failed to fetch metrics. Status code: {response.status_code}")
                LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
                # print(f"Failed to fetch metrics. Status code: {response.status_code}")
                return None
        except Exception as e:
            print(f"Failed to fetch metrics: {str(e)}")
            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
            # print(f"Failed to fetch metrics: {str(e)}")
            return None

    def extract_metric_value(self, metrics, metric_name):
    @staticmethod
    def extract_metric_value(metrics, metric_name):
        """
        Method to extract the value of a metric from the metrics string.
        Args:
@@ -220,4 +225,28 @@ class TelemetryBackendService:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    @staticmethod
    def stream_node_export_metrics_to_raw_topic():
        try:
            while True:
                response = requests.get(EXPORTER_ENDPOINT)
                LOGGER.info("Response Status {:} ".format(response))
                try: 
                    if response.status_code == 200:
                        producerObj = KafkaProducer(PRODUCER_CONFIG)
                        producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
                        producerObj.flush()
                        LOGGER.info("Produce to topic")
                    else:
                        LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
                        print(f"Didn't received expected response. Status code: {response.status_code}")
                        return None
                    time.sleep(5)
                except Exception as e:
                    LOGGER.info("Failed to process response. Status code: {:}".format(e))
                    return None
        except Exception as e:
            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
            print(f"Failed to fetch metrics: {str(e)}")
            return None
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
 No newline at end of file
+25 −14
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
import sys
print (sys.path)
sys.path.append('/home/tfs/tfs-ctrl')
import threading
import logging
from typing import Tuple
from common.proto.context_pb2 import Empty
@@ -27,17 +28,27 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend
###########################

def test_verify_kafka_topics():
    LOGGER.info('test_verify_kafka_topics requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
    KafkaTopics = ['topic_request', 'topic_response']
    response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
    LOGGER.debug(str(response))
    assert isinstance(response, bool)

def test_run_kafka_listener():
    LOGGER.info('test_receive_kafka_request requesting')
    TelemetryBackendServiceObj = TelemetryBackendService()
    response = TelemetryBackendServiceObj.run_kafka_listener()
    LOGGER.debug(str(response))
    assert isinstance(response, bool)
# def test_verify_kafka_topics():
#     LOGGER.info('test_verify_kafka_topics requesting')
#     TelemetryBackendServiceObj = TelemetryBackendService()
#     KafkaTopics = ['topic_request', 'topic_response']
#     response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
#     LOGGER.debug(str(response))
#     assert isinstance(response, bool)

# def test_run_kafka_listener():
#     LOGGER.info('test_receive_kafka_request requesting')
#     TelemetryBackendServiceObj = TelemetryBackendService()
#     response = TelemetryBackendServiceObj.run_kafka_listener()
#     LOGGER.debug(str(response))
#     assert isinstance(response, bool)


def test_fetch_node_exporter_metrics():
    LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ')
    TelemetryBackendService.fetch_single_node_exporter_metric()

def test_stream_node_export_metrics_to_raw_topic():
    LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
    threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()