diff --git a/src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py b/src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py index b6b55f913cde14f3d3e6c517678dfd1490ec0ead..da55131700f366416304407579afe2d7f7aab00e 100755 --- a/src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py +++ b/src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py @@ -14,10 +14,10 @@ # limitations under the License. import time -import grpc import logging import requests import threading +from typing import Tuple from common.proto.context_pb2 import Empty from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic @@ -32,31 +32,34 @@ class KafkaProducerServiceImpl: Class to fetch metrics from Exporter and produce them to Kafka. """ - def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval): + def __init__(self, bootstrap_servers=None, exporter_endpoint=None, + kafka_topic=None, run_duration=None, fetch_interval=None): """ Constructor to initialize Kafka producer parameters. Args: bootstrap_servers (str): Kafka broker address. - node_exporter_endpoint (str): Node Exporter metrics endpoint. + exporter_endpoint (str): Node Exporter metrics endpoint. kafka_topic (str): Kafka topic to produce metrics to. run_interval (int): Time interval in seconds to run the producer. """ LOGGER.info('Init TelemetryBackendService') - self.bootstrap_servers = bootstrap_servers - self.node_exporter_endpoint = node_exporter_endpoint - self.kafka_topic = kafka_topic - self.run_duration = run_duration - self.fetch_interval = fetch_interval + self.bootstrap_servers = bootstrap_servers + self.exporter_endpoint = exporter_endpoint + self.kafka_topic = kafka_topic + self.run_duration = run_duration + self.fetch_interval = fetch_interval # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def export_collector_value(request: CollectorId) -> str: # type: ignore - response = str() - response = '-1' + def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore + response = Tuple[str, str] + response = ('test collector Id', 'test collected value') # Metric to be fetched from endpoint based on Collector message return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore + def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore + # _collector_id, _collector_id_value = request + # write collector_id and collector_id value on the Kafka topic return Empty() # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- @@ -69,7 +72,7 @@ class KafkaProducerServiceImpl: """ KPI = "node_network_receive_packets_total" try: - response = requests.get(self.node_exporter_endpoint) + response = requests.get(self.exporter_endpoint) if response.status_code == 200: # print(f"Metrics fetched sucessfully...") metrics = response.text