diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 8e6fb243ea324b9eb572c165a43a8bbaf22466f3..4c76917c8883911fabb6af77e1ed31f45d0ccdd9 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -1,3 +1,4 @@ + # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,26 +13,216 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time +import logging +import requests +import threading +from typing import Tuple +from common.proto.context_pb2 import Empty +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaException +from confluent_kafka import KafkaError +from confluent_kafka.admin import AdminClient, NewTopic +from common.proto.telemetry_frontend_pb2 import Collector, CollectorId +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') +KAFKA_SERVER_IP = '127.0.0.1:9092' + class TelemetryBackendService: """ - Class to control Kafka producer functionality. + Class to fetch metrics from Exporter and produce them to Kafka. """ - def __init__(self): - pass + def __init__(self, bootstrap_servers='127.0.0.1:9092', exporter_endpoint=None, + kafka_topic=None, run_duration=None, fetch_interval=None): + """ + Constructor to initialize Kafka producer parameters. + Args: + bootstrap_servers (str): Kafka broker address. + exporter_endpoint (str): Node Exporter metrics endpoint. + kafka_topic (str): Kafka topic to produce metrics to. + run_interval (int): Time interval in seconds to run the producer. + """ + LOGGER.info('Init TelemetryBackendService') - def generate_kafka_configs(self): + self.bootstrap_servers = bootstrap_servers + self.exporter_endpoint = exporter_endpoint + self.kafka_topic = kafka_topic + self.run_duration = run_duration + self.fetch_interval = fetch_interval + + def receive_kafka_request(self, + ): # type: ignore """ - Method to generate Kafka configurations + Method to receive collector request on Kafka topic. """ - create_kafka_configs = { - 'bootstrap_servers' : "test_server", # Kafka broker address - Replace with your Kafka broker address - 'exporter_endpoint' : "test_exporter", # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint - 'kafka_topic' : "test_kafka_topic", # Kafka topic to produce to - 'run_duration' : 10, # Total duration to execute the producer - 'fetch_interval' : 2 # Time between two fetch requests + conusmer_configs = { + 'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'consumer', + 'auto.offset.reset' : 'earliest' } - return create_kafka_configs + topic_request = "topic_request" + + consumerObj = KafkaConsumer(conusmer_configs) + consumerObj.subscribe([topic_request]) + + start_time = time.time() + while True: + receive_msg = consumerObj.poll(1.0) + if receive_msg is None: + print ("nothing to read ...", time.time() - start_time) + if time.time() - start_time >= 10: # type: ignore + print("Timeout: consumer terminated") + break + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + print ("Received Message: ", receive_msg.value().decode('utf-8')) + + def execute_receive_kafka_request(self + )->Empty: # type: ignore + threading.Thread(target=self.receive_kafka_request).start() + return True + + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore + response = Tuple[str, str] + collector_id = str('test collector Id') + collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message + response = (collector_id, collected_Value) + return response + + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer: + (collector_id, collector_value) = request + response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers}) + # _collector_id, _collector_id_value = request + # write collector_id and collector_id value on the Kafka topic + + # get kafka bootstrap server and topic name + # write to kafka topic + + return response + + def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore + # flush and close kafka producer object + return Empty() + +# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- + + def fetch_node_exporter_metrics(self): + """ + Method to fetch metrics from Node Exporter. + Returns: + str: Metrics fetched from Node Exporter. + """ + KPI = "node_network_receive_packets_total" + try: + response = requests.get(self.exporter_endpoint) # type: ignore + if response.status_code == 200: + # print(f"Metrics fetched sucessfully...") + metrics = response.text + # Check if the desired metric is available in the response + if KPI in metrics: + KPI_VALUE = self.extract_metric_value(metrics, KPI) + # Extract the metric value + if KPI_VALUE is not None: + print(f"KPI value: {KPI_VALUE}") + return KPI_VALUE + else: + print(f"Failed to fetch metrics. Status code: {response.status_code}") + return None + except Exception as e: + print(f"Failed to fetch metrics: {str(e)}") + return None + + def extract_metric_value(self, metrics, metric_name): + """ + Method to extract the value of a metric from the metrics string. + Args: + metrics (str): Metrics string fetched from Node Exporter. + metric_name (str): Name of the metric to extract. + Returns: + float: Value of the extracted metric, or None if not found. + """ + try: + # Find the metric line containing the desired metric name + metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) + # Split the line to extract the metric value + metric_value = float(metric_line.split()[1]) + return metric_value + except StopIteration: + print(f"Metric '{metric_name}' not found in the metrics.") + return None + + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + print(f'Message delivery failed: {err}') + else: + print(f'Message delivered to topic {msg.topic()}') + + def create_topic_if_not_exists(self, admin_client): + """ + Method to create Kafka topic if it does not exist. + Args: + admin_client (AdminClient): Kafka admin client. + """ + try: + topic_metadata = admin_client.list_topics(timeout=5) + if self.kafka_topic not in topic_metadata.topics: + # If the topic does not exist, create a new topic + print(f"Topic '{self.kafka_topic}' does not exist. Creating...") + new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) + admin_client.create_topics([new_topic]) + except KafkaException as e: + print(f"Failed to create topic: {e}") + + def produce_metrics(self): + """ + Method to produce metrics to Kafka topic as per Kafka configs. + """ + conf = { + 'bootstrap.servers': self.bootstrap_servers, + } + + admin_client = AdminClient(conf) + self.create_topic_if_not_exists(admin_client) + + kafka_producer = KafkaProducer(conf) + + try: + start_time = time.time() + while True: + metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements + + if metrics: + kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) + kafka_producer.flush() + # print("Metrics produced to Kafka topic") + # Check if the specified run duration has elapsed + if time.time() - start_time >= self.run_duration: # type: ignore + break + # waiting time until next fetch + time.sleep(self.fetch_interval) # type: ignore + except KeyboardInterrupt: + print("Keyboard interrupt detected. Exiting...") + finally: + kafka_producer.flush() + # kafka_producer.close() # this command generates ERROR +# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file