# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .KafkaProducerServiceImpl import KafkaProducerServiceImpl class KafkaProducerService: """ Class to control Kafka producer functionality. """ def __init__(self): kafka_configs = self.generate_kafka_configs() self.bootstrap_servers = kafka_configs['bootstrap_servers'] self.node_exporter_endpoint = kafka_configs['node_exporter_endpoint'] self.kafka_topic = kafka_configs['kafka_topic'] self.run_duration = kafka_configs['run_duration'] self.fetch_interval = kafka_configs['fetch_interval'] def generate_kafka_configs(self): # define the function to get every attribute """ Method to generate Kafka configurations """ create_kafka_configs = { 'bootstrap_servers' : '127.0.0.1:9092', # Kafka broker address - Replace with your Kafka broker address 'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint 'kafka_topic' : 'metric-data', # Kafka topic to produce to 'run_duration' : 20, # Total duration to execute the producer 'fetch_interval' : 4 # Time between two fetch requests } return create_kafka_configs def run_producer(self): """ Method to create KafkaProducerServiceImpl object and start producer. """ # Create NodeExporterProducer object and run start_producer_thread producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint, self.kafka_topic, self.run_duration, self.fetch_interval ) # producer.start_producer_thread() # if threading is required producer.produce_metrics() # if threading is not required if __name__ == "__main__": # Create Kafka producer service object and run producer kafka_controller = KafkaProducerService() kafka_controller.run_producer()