Skip to content
Snippets Groups Projects
KafkaProducerServiceImpl.py 7.22 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import time
    
    from common.proto.context_pb2 import Empty
    from confluent_kafka import Producer, KafkaException
    from confluent_kafka.admin import AdminClient, NewTopic
    from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    
    LOGGER = logging.getLogger(__name__)
    METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
    
    class KafkaProducerServiceImpl:
    
        Class to fetch metrics from Exporter and produce them to Kafka.
    
        def __init__(self, bootstrap_servers=None, exporter_endpoint=None,
                      kafka_topic=None, run_duration=None, fetch_interval=None):
    
            """
            Constructor to initialize Kafka producer parameters.
            Args:
                bootstrap_servers (str): Kafka broker address.
    
                exporter_endpoint (str): Node Exporter metrics endpoint.
    
                kafka_topic (str): Kafka topic to produce metrics to.
                run_interval (int): Time interval in seconds to run the producer.
            """
    
            LOGGER.info('Init TelemetryBackendService')
    
    
            self.bootstrap_servers = bootstrap_servers
            self.exporter_endpoint = exporter_endpoint
            self.kafka_topic       = kafka_topic
            self.run_duration      = run_duration
            self.fetch_interval    = fetch_interval
    
        
        # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
        def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
            response =  Tuple[str, str]
            response = ('test collector Id', 'test collected value')      # Metric to be fetched from endpoint based on Collector message
    
            return response
    
        # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
        def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
            # _collector_id, _collector_id_value = request
            # write collector_id and collector_id value on the Kafka topic
    
            return Empty()
            
    # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
    
        def fetch_node_exporter_metrics(self):
    
            """
            Method to fetch metrics from Node Exporter.
            Returns:
                str: Metrics fetched from Node Exporter.
            """
    
            KPI = "node_network_receive_packets_total"
    
                response = requests.get(self.exporter_endpoint)
    
                    # print(f"Metrics fetched sucessfully...")
                    metrics = response.text
                    # Check if the desired metric is available in the response
                    if KPI in metrics:
                        KPI_VALUE = self.extract_metric_value(metrics, KPI)
                        # Extract the metric value
                        if KPI_VALUE is not None:
                            print(f"KPI value: {KPI_VALUE}")
                            return KPI_VALUE
    
                else:
                    print(f"Failed to fetch metrics. Status code: {response.status_code}")
                    return None
            except Exception as e:
                print(f"Failed to fetch metrics: {str(e)}")
                return None
    
    
        def extract_metric_value(self, metrics, metric_name):
            """
            Method to extract the value of a metric from the metrics string.
            Args:
                metrics (str): Metrics string fetched from Node Exporter.
                metric_name (str): Name of the metric to extract.
            Returns:
                float: Value of the extracted metric, or None if not found.
            """
            try:
                # Find the metric line containing the desired metric name
                metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
                # Split the line to extract the metric value
                metric_value = float(metric_line.split()[1])
                return metric_value
            except StopIteration:
                print(f"Metric '{metric_name}' not found in the metrics.")
                return None
    
    
        def delivery_callback(self, err, msg):
            """
            Callback function to handle message delivery status.
            Args:
                err (KafkaError): Kafka error object.
                msg (Message): Kafka message object.
            """
            if err:
                print(f'Message delivery failed: {err}')
            else:
                print(f'Message delivered to topic {msg.topic()}')
    
        def create_topic_if_not_exists(self, admin_client):
            """
            Method to create Kafka topic if it does not exist.
            Args:
                admin_client (AdminClient): Kafka admin client.
            """
            try:
                topic_metadata = admin_client.list_topics(timeout=5)
                if self.kafka_topic not in topic_metadata.topics:
                    # If the topic does not exist, create a new topic
                    print(f"Topic '{self.kafka_topic}' does not exist. Creating...")
                    new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1)
                    admin_client.create_topics([new_topic])
            except KafkaException as e:
                print(f"Failed to create topic: {e}")
    
        def produce_metrics(self):
            """
    
            Method to produce metrics to Kafka topic as per Kafka configs.
    
            """
            conf = {
                'bootstrap.servers': self.bootstrap_servers,
            }
    
            admin_client = AdminClient(conf)
            self.create_topic_if_not_exists(admin_client)
    
            kafka_producer = Producer(conf)
    
            try:
                start_time = time.time()
                while True:
    
                    metrics = self.fetch_node_exporter_metrics()  # select the function name based on the provided requirements
    
                        kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback)
    
                        # print("Metrics produced to Kafka topic")
    
    
                    # Check if the specified run duration has elapsed
                    if time.time() - start_time >= self.run_duration:
                        break
    
                    # waiting time until next fetch 
                    time.sleep(self.fetch_interval)
            except KeyboardInterrupt:
                print("Keyboard interrupt detected. Exiting...")
            finally:
                kafka_producer.flush()
                # kafka_producer.close()        # this command generates ERROR
    
        def start_producer_thread(self):
            """
            Method to start the producer thread.
            """
            producer_thread = threading.Thread(target=self.produce_metrics)
            producer_thread.start()
    
    
    # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------