diff --git a/src/telemetry/backend/service/TelemetryBackendServiceImpl.py b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py deleted file mode 100755 index abcc30baf82daa7013965a7888370f05129746bc..0000000000000000000000000000000000000000 --- a/src/telemetry/backend/service/TelemetryBackendServiceImpl.py +++ /dev/null @@ -1,188 +0,0 @@ - -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -import logging -import requests -from typing import Tuple -from common.proto.context_pb2 import Empty -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaException -from confluent_kafka.admin import AdminClient, NewTopic -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method - -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -ACTIVE_KAFKA_PRODUCERS = [] # list of active kafka producers - -class TelemetryBackendServiceImpl: - """ - Class to fetch metrics from Exporter and produce them to Kafka. - """ - - def __init__(self, bootstrap_servers=None, exporter_endpoint=None, - kafka_topic=None, run_duration=None, fetch_interval=None): - """ - Constructor to initialize Kafka producer parameters. - Args: - bootstrap_servers (str): Kafka broker address. - exporter_endpoint (str): Node Exporter metrics endpoint. - kafka_topic (str): Kafka topic to produce metrics to. - run_interval (int): Time interval in seconds to run the producer. - """ - LOGGER.info('Init TelemetryBackendService') - - self.bootstrap_servers = bootstrap_servers - self.exporter_endpoint = exporter_endpoint - self.kafka_topic = kafka_topic - self.run_duration = run_duration - self.fetch_interval = fetch_interval - - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def export_collector_value(self, request : Collector) -> Tuple[str, str]: # type: ignore - response = Tuple[str, str] - collector_id = str('test collector Id') - collected_Value = str('test collected value') # Metric to be fetched from endpoint based on Collector message - response = (collector_id, collected_Value) - return response - - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def write_to_kafka(self, request: Tuple[str, str]) -> KafkaProducer: - (collector_id, collector_value) = request - response = KafkaProducer({'bootstrap.servers': self.bootstrap_servers}) - # _collector_id, _collector_id_value = request - # write collector_id and collector_id value on the Kafka topic - - # get kafka bootstrap server and topic name - # write to kafka topic - - return response - - def stop_producer(self, request: KafkaProducer) -> Empty: # type: ignore - # flush and close kafka producer object - return Empty() - -# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- - - def fetch_node_exporter_metrics(self): - """ - Method to fetch metrics from Node Exporter. - Returns: - str: Metrics fetched from Node Exporter. - """ - KPI = "node_network_receive_packets_total" - try: - response = requests.get(self.exporter_endpoint) # type: ignore - if response.status_code == 200: - # print(f"Metrics fetched sucessfully...") - metrics = response.text - # Check if the desired metric is available in the response - if KPI in metrics: - KPI_VALUE = self.extract_metric_value(metrics, KPI) - # Extract the metric value - if KPI_VALUE is not None: - print(f"KPI value: {KPI_VALUE}") - return KPI_VALUE - else: - print(f"Failed to fetch metrics. Status code: {response.status_code}") - return None - except Exception as e: - print(f"Failed to fetch metrics: {str(e)}") - return None - - def extract_metric_value(self, metrics, metric_name): - """ - Method to extract the value of a metric from the metrics string. - Args: - metrics (str): Metrics string fetched from Node Exporter. - metric_name (str): Name of the metric to extract. - Returns: - float: Value of the extracted metric, or None if not found. - """ - try: - # Find the metric line containing the desired metric name - metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name)) - # Split the line to extract the metric value - metric_value = float(metric_line.split()[1]) - return metric_value - except StopIteration: - print(f"Metric '{metric_name}' not found in the metrics.") - return None - - def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - print(f'Message delivery failed: {err}') - else: - print(f'Message delivered to topic {msg.topic()}') - - def create_topic_if_not_exists(self, admin_client): - """ - Method to create Kafka topic if it does not exist. - Args: - admin_client (AdminClient): Kafka admin client. - """ - try: - topic_metadata = admin_client.list_topics(timeout=5) - if self.kafka_topic not in topic_metadata.topics: - # If the topic does not exist, create a new topic - print(f"Topic '{self.kafka_topic}' does not exist. Creating...") - new_topic = NewTopic(self.kafka_topic, num_partitions=1, replication_factor=1) - admin_client.create_topics([new_topic]) - except KafkaException as e: - print(f"Failed to create topic: {e}") - - def produce_metrics(self): - """ - Method to produce metrics to Kafka topic as per Kafka configs. - """ - conf = { - 'bootstrap.servers': self.bootstrap_servers, - } - - admin_client = AdminClient(conf) - self.create_topic_if_not_exists(admin_client) - - kafka_producer = KafkaProducer(conf) - - try: - start_time = time.time() - while True: - metrics = self.fetch_node_exporter_metrics() # select the function name based on the provided requirements - - if metrics: - kafka_producer.produce(self.kafka_topic, str(metrics), callback=self.delivery_callback) - kafka_producer.flush() - # print("Metrics produced to Kafka topic") - - # Check if the specified run duration has elapsed - if time.time() - start_time >= self.run_duration: # type: ignore - break - - # waiting time until next fetch - time.sleep(self.fetch_interval) # type: ignore - except KeyboardInterrupt: - print("Keyboard interrupt detected. Exiting...") - finally: - kafka_producer.flush() - # kafka_producer.close() # this command generates ERROR - -# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file