diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 4c76917c8883911fabb6af77e1ed31f45d0ccdd9..2e8478db1e7771009c9ab7833f1f886f7abda580 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ast import time import logging import requests @@ -33,31 +34,15 @@ KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryBackendService: """ - Class to fetch metrics from Exporter and produce them to Kafka. + Class to listens for request on Kafka topic, fetches metrics and produces measured values to another Kafka topic. """ - def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None, - kafka_topic=None, run_duration=None, fetch_interval=None): - """ - Constructor to initialize Kafka producer parameters. - Args: - bootstrap_servers (str): Kafka broker address. - exporter_endpoint (str): Node Exporter metrics endpoint. - kafka_topic (str): Kafka topic to produce metrics to. - run_interval (int): Time interval in seconds to run the producer. - """ + def __init__(self): LOGGER.info('Init TelemetryBackendService') - - self.bootstrap_servers = bootstrap_servers - self.exporter_endpoint = exporter_endpoint - self.kafka_topic = kafka_topic - self.run_duration = run_duration - self.fetch_interval = fetch_interval - def receive_kafka_request(self, - ): # type: ignore + def kafka_listener(self): """ - Method to receive collector request on Kafka topic. + listener for requests on Kafka topic. """ conusmer_configs = { 'bootstrap.servers' : KAFKA_SERVER_IP, @@ -69,14 +54,10 @@ class TelemetryBackendService: consumerObj = KafkaConsumer(conusmer_configs) consumerObj.subscribe([topic_request]) - start_time = time.time() while True: - receive_msg = consumerObj.poll(1.0) + receive_msg = consumerObj.poll(2.0) if receive_msg is None: - print ("nothing to read ...", time.time() - start_time) - if time.time() - start_time >= 10: # type: ignore - print("Timeout: consumer terminated") - break + print ("Telemetry backend listener is active: Kafka Topic: ", topic_request) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -84,36 +65,30 @@ class TelemetryBackendService: else: print("Consumer error: {}".format(receive_msg.error())) break - print ("Received Message: ", receive_msg.value().decode('utf-8')) - - def execute_receive_kafka_request(self - )->Empty: # type: ignore - threading.Thread(target=self.receive_kafka_request).start() - return True - - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore - response = Tuple[str, str] - collector_id = str('test collector Id') - collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message - response = (collector_id, collected_Value) - return response - - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer: - (collector_id, collector_value) = request - response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers}) - # _collector_id, _collector_id_value = request - # write collector_id and collector_id value on the Kafka topic - - # get kafka bootstrap server and topic name - # write to kafka topic - - return response + (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.execute_process_kafka_request(kpi_id, duration, interval) - def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore - # flush and close kafka producer object - return Empty() + def run_kafka_listener(self)->Empty: # type: ignore + threading.Thread(target=self.kafka_listener).start() + return True + + def process_kafka_request(self, kpi_id, duration, interval + ): # type: ignore + """ + Method to receive collector request attribues and initiates collecter backend. + """ + start_time = time.time() + while True: + if time.time() - start_time >= duration: # type: ignore + print("Timeout: consumer terminated", time.time() - start_time) + break + # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) + print ("Telemetry Backend running for KPI: ", kpi_id, "after FETCH INTERVAL: ", interval) + time.sleep(interval) + + def execute_process_kafka_request(self, kpi_id: str, duration: int, interval: int): + threading.Thread(target=self.process_kafka_request, args=(kpi_id, duration, interval)).start() + # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------