Commit 0e7ad11c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

"Kafka_listener" is added

parent 2ee9d310
Loading
Loading
Loading
Loading
+30 −55
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import time
import logging
import requests
@@ -33,31 +34,15 @@ KAFKA_SERVER_IP = '127.0.0.1:9092'

class TelemetryBackendService:
    """
    Class to fetch metrics from Exporter and produce them to Kafka.
    Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic.
    """

    def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None,
                  kafka_topic=None, run_duration=None, fetch_interval=None):
        """
        Constructor to initialize Kafka producer parameters.
        Args:
            bootstrap_servers (str): Kafka broker address.
            exporter_endpoint (str): Node Exporter metrics endpoint.
            kafka_topic (str): Kafka topic to produce metrics to.
            run_interval (int): Time interval in seconds to run the producer.
        """
    def __init__(self):
        LOGGER.info('Init TelemetryBackendService')
    
        self.bootstrap_servers = bootstrap_servers
        self.exporter_endpoint = exporter_endpoint
        self.kafka_topic       = kafka_topic
        self.run_duration      = run_duration
        self.fetch_interval    = fetch_interval
    
    def receive_kafka_request(self,
                        ): # type: ignore
    def kafka_listener(self):
        """
        Method to receive collector request on Kafka topic.
        listener for requests on Kafka topic.
        """
        conusmer_configs = {
            'bootstrap.servers' : KAFKA_SERVER_IP,
@@ -69,14 +54,10 @@ class TelemetryBackendService:
        consumerObj = KafkaConsumer(conusmer_configs)
        consumerObj.subscribe([topic_request])

        start_time = time.time()
        while True:
            receive_msg = consumerObj.poll(1.0)
            receive_msg = consumerObj.poll(2.0)
            if receive_msg is None:
                print ("nothing to read ...", time.time() - start_time)
                if time.time() - start_time >= 10: # type: ignore
                    print("Timeout: consumer terminated")
                    break
                print ("Telemetry backend listener is active:  Kafka Topic: ", topic_request)     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
@@ -84,36 +65,30 @@ class TelemetryBackendService:
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    break
            print ("Received Message: ", receive_msg.value().decode('utf-8'))
            (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8'))
            self.execute_process_kafka_request(kpi_id, duration, interval)

    def execute_receive_kafka_request(self
                                      )->Empty: # type: ignore
        threading.Thread(target=self.receive_kafka_request).start()
    def run_kafka_listener(self)->Empty: # type: ignore
        threading.Thread(target=self.kafka_listener).start()
        return True        

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore
        response        =  Tuple[str, str]
        collector_id    = str('test collector Id')
        collected_Value = str('test collected value')        # Metric to be fetched from endpoint based on Collector message
        response        = (collector_id, collected_Value)      
        return response

    # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer:
        (collector_id, collector_value) = request
        response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers})
        # _collector_id, _collector_id_value = request
        # write collector_id and collector_id value on the Kafka topic

        # get kafka bootstrap server and topic name
        # write to kafka topic
        
        return response

    def stop_producer(self, request: KafkaProducer) -> Empty:  # type: ignore
        # flush and close kafka producer object
        return Empty()
    def process_kafka_request(self, kpi_id, duration, interval
                        ): # type: ignore
        """
        Method to receive collector request attribues and initiates collecter backend.
        """
        start_time = time.time()
        while True:
            if time.time() - start_time >= duration: # type: ignore
                print("Timeout: consumer terminated", time.time() - start_time)
                break
            # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval)
            print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval)
            time.sleep(interval)
    
    def execute_process_kafka_request(self, kpi_id: str, duration: int, interval: int):
        threading.Thread(target=self.process_kafka_request, args=(kpi_id, duration, interval)).start()
        

# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------