Loading src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py +29 −5 Original line number Diff line number Diff line Loading @@ -13,11 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic import requests import time import grpc import logging import requests import threading from common.proto.context_pb2 import Empty from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') class KafkaProducerServiceImpl: """ Loading @@ -33,12 +41,26 @@ class KafkaProducerServiceImpl: kafka_topic (str): Kafka topic to produce metrics to. run_interval (int): Time interval in seconds to run the producer. """ LOGGER.info('Init TelemetryBackendService') self.bootstrap_servers = bootstrap_servers self.node_exporter_endpoint = node_exporter_endpoint self.kafka_topic = kafka_topic self.run_duration = run_duration self.fetch_interval = fetch_interval # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def export_collector_value(request: CollectorId) -> str: # type: ignore response = str() response = '-1' return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore return Empty() # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- def fetch_node_exporter_metrics(self): """ Method to fetch metrics from Node Exporter. Loading Loading @@ -114,7 +136,7 @@ class KafkaProducerServiceImpl: def produce_metrics(self): """ Method to continuously produce metrics to Kafka topic. Method to produce metrics to Kafka topic as per Kafka configs. """ conf = { 'bootstrap.servers': self.bootstrap_servers, Loading Loading @@ -146,10 +168,12 @@ class KafkaProducerServiceImpl: finally: kafka_producer.flush() # kafka_producer.close() # this command generates ERROR # --- def start_producer_thread(self): """ Method to start the producer thread. """ producer_thread = threading.Thread(target=self.produce_metrics) producer_thread.start() # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- No newline at end of file Loading
src/telemetry_frontend/backend/service/KafkaProducerServiceImpl.py +29 −5 Original line number Diff line number Diff line Loading @@ -13,11 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic import requests import time import grpc import logging import requests import threading from common.proto.context_pb2 import Empty from confluent_kafka import Producer, KafkaException from confluent_kafka.admin import AdminClient, NewTopic from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') class KafkaProducerServiceImpl: """ Loading @@ -33,12 +41,26 @@ class KafkaProducerServiceImpl: kafka_topic (str): Kafka topic to produce metrics to. run_interval (int): Time interval in seconds to run the producer. """ LOGGER.info('Init TelemetryBackendService') self.bootstrap_servers = bootstrap_servers self.node_exporter_endpoint = node_exporter_endpoint self.kafka_topic = kafka_topic self.run_duration = run_duration self.fetch_interval = fetch_interval # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def export_collector_value(request: CollectorId) -> str: # type: ignore response = str() response = '-1' return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore return Empty() # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- def fetch_node_exporter_metrics(self): """ Method to fetch metrics from Node Exporter. Loading Loading @@ -114,7 +136,7 @@ class KafkaProducerServiceImpl: def produce_metrics(self): """ Method to continuously produce metrics to Kafka topic. Method to produce metrics to Kafka topic as per Kafka configs. """ conf = { 'bootstrap.servers': self.bootstrap_servers, Loading Loading @@ -146,10 +168,12 @@ class KafkaProducerServiceImpl: finally: kafka_producer.flush() # kafka_producer.close() # this command generates ERROR # --- def start_producer_thread(self): """ Method to start the producer thread. """ producer_thread = threading.Thread(target=self.produce_metrics) producer_thread.start() # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- No newline at end of file