Commit eb47527c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

update methods "export_collector_value" and "write_to_kafka" parameters and...

update methods "export_collector_value" and "write_to_kafka" parameters and return type of "export_collector_value".
parent 1c03799f
Loading
Loading
Loading
Loading
+16 −13
Original line number Diff line number Diff line
@@ -14,10 +14,10 @@
# limitations under the License.

import time
import grpc
import logging
import requests
import threading
from typing import Tuple
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
@@ -32,31 +32,34 @@ class KafkaProducerServiceImpl:
    Class to fetch metrics from Exporter and produce them to Kafka.
    """

    def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval):
    def __init__(self, bootstrap_servers=None, exporter_endpoint=None,
                  kafka_topic=None, run_duration=None, fetch_interval=None):
        """
        Constructor to initialize Kafka producer parameters.
        Args:
            bootstrap_servers (str): Kafka broker address.
            node_exporter_endpoint (str): Node Exporter metrics endpoint.
            exporter_endpoint (str): Node Exporter metrics endpoint.
            kafka_topic (str): Kafka topic to produce metrics to.
            run_interval (int): Time interval in seconds to run the producer.
        """
        LOGGER.info('Init TelemetryBackendService')

        self.bootstrap_servers = bootstrap_servers
        self.node_exporter_endpoint = node_exporter_endpoint
        self.exporter_endpoint = exporter_endpoint
        self.kafka_topic       = kafka_topic
        self.run_duration      = run_duration
        self.fetch_interval    = fetch_interval
    
    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def export_collector_value(request: CollectorId) -> str: # type: ignore
        response = str()
        response = '-1'
    def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
        response =  Tuple[str, str]
        response = ('test collector Id', 'test collected value')      # Metric to be fetched from endpoint based on Collector message
        return response

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore
    def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
        # _collector_id, _collector_id_value = request
        # write collector_id and collector_id value on the Kafka topic
        return Empty()
        
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
@@ -69,7 +72,7 @@ class KafkaProducerServiceImpl:
        """
        KPI = "node_network_receive_packets_total"
        try:
            response = requests.get(self.node_exporter_endpoint)
            response = requests.get(self.exporter_endpoint)
            if response.status_code == 200:
                # print(f"Metrics fetched sucessfully...")
                metrics = response.text