diff --git a/src/telemetry_frontend/backend/KafkaProducerController.py b/src/telemetry_frontend/backend/KafkaProducerController.py new file mode 100755 index 0000000000000000000000000000000000000000..8c88d5e8ebf72aeee057bb0afe257f68b8d7d672 --- /dev/null +++ b/src/telemetry_frontend/backend/KafkaProducerController.py @@ -0,0 +1,57 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from NodeExporterProducer import KafkaNodeExporterProducer + +class KafkaProducerController: + """ + Class to control Kafka producer functionality. + """ + def __init__(self): + kafka_configs = self.generate_kafka_configurations() + self.bootstrap_servers = kafka_configs['bootstrap_servers'] + self.node_exporter_endpoint = kafka_configs['node_exporter_endpoint'] + self.kafka_topic = kafka_configs['kafka_topic'] + self.run_duration = kafka_configs['run_duration'] + self.fetch_interval = kafka_configs['fetch_interval'] + + def generate_kafka_configurations(self): + """ + Method to generate Kafka configurations + """ + create_kafka_configuration = { + 'bootstrap_servers' : '127.0.0.1:9092', # Kafka broker address - Replace with your Kafka broker address + 'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint + 'kafka_topic' : 'metric-data', # Kafka topic to produce to + 'run_duration' : 20, # Total duration to execute the producer + 'fetch_interval' : 3 # Time between two fetch requests + } + return create_kafka_configuration + + def run_producer(self): + """ + Method to create KafkaNodeExporterProducer object and start producer thread. + """ + # Create NodeExporterProducer object and run start_producer_thread + producer = KafkaNodeExporterProducer(self.bootstrap_servers, self.node_exporter_endpoint, + self.kafka_topic, self.run_duration, self.fetch_interval + ) + # producer.start_producer_thread() # if threading is required + producer.produce_metrics() # if threading is not required + +if __name__ == "__main__": + + # Create Kafka producer controller object and run producer + kafka_controller = KafkaProducerController() + kafka_controller.run_producer()