Skip to content
Snippets Groups Projects
TelemetryBackendService.py 11.6 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import logging
import requests
import threading
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaException
from confluent_kafka import KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method

LOGGER             = logging.getLogger(__name__)
METRICS_POOL       = MetricsPool('Telemetry', 'TelemetryBackend')
Waleed Akbar's avatar
Waleed Akbar committed
KAFKA_SERVER_IP    = '127.0.0.1:9092'
# KAFKA_SERVER_IP    = '10.152.183.175:30092'
ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labeled'}
EXPORTER_ENDPOINT  = "http://10.152.183.2:9100/metrics"
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}

    Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
    def __init__(self):
        LOGGER.info('Init TelemetryBackendService')
    def run_kafka_listener(self)->bool:
        threading.Thread(target=self.kafka_listener).start()
        return True        
    
    def kafka_listener(self):
        listener for requests on Kafka topic.
        conusmer_configs = {
            'bootstrap.servers' : KAFKA_SERVER_IP,
            'group.id'          : 'backend',
            'auto.offset.reset' : 'latest'
        # topic_request = "topic_request"
        consumerObj = KafkaConsumer(conusmer_configs)
        # consumerObj.subscribe([topic_request])
        consumerObj.subscribe([KAFKA_TOPICS['request']])
        while True:
            receive_msg = consumerObj.poll(2.0)
            if receive_msg is None:
                # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
            (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
            collector_id = receive_msg.key().decode('utf-8')
            if duration == -1 and interval == -1:
                self.terminate_collector_backend(collector_id)
                # threading.Thread(target=self.terminate_collector_backend, args=(collector_id))
            else:
                self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval)

    def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int):
        stop_event = threading.Event()
        thread = threading.Thread(target=self.initiate_collector_backend, 
                                  args=(collector_id, kpi_id, duration, interval, stop_event))
        self.running_threads[collector_id] = (thread, stop_event)
        thread.start()
    def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event
                        ): # type: ignore
        """
        Method to receive collector request attribues and initiates collecter backend.
        """
        print("Initiating backend for collector: ", collector_id)
        start_time = time.time()
            if time.time() - start_time >= duration:            # condition to terminate backend
                print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
                self.generate_kafka_response(collector_id, "-1", -1)
                # write to Kafka to send the termination confirmation.
                break
            # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
            self.extract_kpi_value(collector_id, kpi_id)
            # print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
            time.sleep(interval)
    def extract_kpi_value(self, collector_id: str, kpi_id: str):
        measured_kpi_value = random.randint(1,100)                  # Should be extracted from exporter/stream
        # measured_kpi_value = self.fetch_node_exporter_metrics()     # exporter extracted metric value against default KPI
        self.generate_kafka_response(collector_id, kpi_id , measured_kpi_value)
    def generate_kafka_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
        # topic_response = "topic_response"
        msg_value : Tuple [str, Any] = (kpi_id, kpi_value)
        msg_key    = collector_id
        producerObj = KafkaProducer(PRODUCER_CONFIG)
        # producerObj.produce(topic_response, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
        producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=TelemetryBackendService.delivery_callback)
    def terminate_collector_backend(self, collector_id):
        if collector_id in self.running_threads:
            thread, stop_event = self.running_threads[collector_id]
            stop_event.set()
            thread.join()
            print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
            del self.running_threads[collector_id]
            self.generate_kafka_response(collector_id, "-1", -1)

    def create_topic_if_not_exists(self, new_topics: list) -> bool:
        """
        Method to create Kafka topic if it does not exist.
        Args:
            admin_client (AdminClient): Kafka admin client.
        """
        for topic in new_topics:
            try:
                topic_metadata = ADMIN_KAFKA_CLIENT.list_topics(timeout=5)
                if topic not in topic_metadata.topics:
                    # If the topic does not exist, create a new topic
                    print(f"Topic '{topic}' does not exist. Creating...")
                    LOGGER.warning("Topic {:} does not exist. Creating...".format(topic))
                    new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
                    ADMIN_KAFKA_CLIENT.create_topics([new_topic])
            except KafkaException as e:
                print(f"Failed to create topic: {e}")
                return False
    @staticmethod
    def delivery_callback( err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
    @staticmethod
    def fetch_single_node_exporter_metric():
        """
        Method to fetch metrics from Node Exporter.
        Returns:
            str: Metrics fetched from Node Exporter.
        """
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(EXPORTER_ENDPOINT) # type: ignore
            LOGGER.info("Request status {:}".format(response))
            if response.status_code == 200:
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text
                # Check if the desired metric is available in the response
                if KPI in metrics:
                    KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
                    # Extract the metric value
                    if KPI_VALUE is not None:
                        LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
                        print(f"Extracted value of {KPI} is: {KPI_VALUE}")
                LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
                # print(f"Failed to fetch metrics. Status code: {response.status_code}")
                return None
        except Exception as e:
            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
            # print(f"Failed to fetch metrics: {str(e)}")
    @staticmethod
    def extract_metric_value(metrics, metric_name):
        """
        Method to extract the value of a metric from the metrics string.
        Args:
            metrics (str): Metrics string fetched from Exporter.
            metric_name (str): Name of the metric to extract.
        Returns:
            float: Value of the extracted metric, or None if not found.
        """
        try:
            # Find the metric line containing the desired metric name
            metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
            # Split the line to extract the metric value
            metric_value = float(metric_line.split()[1])
            return metric_value
        except StopIteration:
            print(f"Metric '{metric_name}' not found in the metrics.")
            return None

    @staticmethod
    def stream_node_export_metrics_to_raw_topic():
        try:
            while True:
                response = requests.get(EXPORTER_ENDPOINT)
                # print("Response Status {:} ".format(response))
                # LOGGER.info("Response Status {:} ".format(response))
                try: 
                    if response.status_code == 200:
                        producerObj = KafkaProducer(PRODUCER_CONFIG)
                        producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
                        producerObj.flush()
                        LOGGER.info("Produce to topic")
                    else:
                        LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
                        print(f"Didn't received expected response. Status code: {response.status_code}")
                        return None
Waleed Akbar's avatar
Waleed Akbar committed
                    time.sleep(15)
                except Exception as e:
                    LOGGER.info("Failed to process response. Status code: {:}".format(e))
                    return None
        except Exception as e:
            LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
            print(f"Failed to fetch metrics: {str(e)}")
            return None
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------