Loading src/kpi_manager/service/KpiWriter.py +31 −17 Original line number Diff line number Diff line Loading @@ -14,9 +14,11 @@ # read Kafka stream from Kafka topic import ast import time import threading from confluent_kafka import KafkaError from prometheus_client import start_http_server, Gauge from prometheus_client import start_http_server, Gauge, CollectorRegistry from confluent_kafka import Consumer as KafkaConsumer KAFKA_SERVER_IP = '127.0.0.1:9092' Loading @@ -30,7 +32,7 @@ KPIs_TO_SEARCH = ["node_network_receive_packets_total", "node_network_transmit_bytes_total", "process_open_fds"] PROM_METRICS = {} KAFKA_REGISTERY = CollectorRegistry() class KpiWriter: def __init__(self) -> None: Loading @@ -38,6 +40,9 @@ class KpiWriter: @staticmethod def kpi_writer(): # Start up the server to expose the metrics at port number mention below. start_http_server(8101) KpiWriter.create_prom_metrics_name() threading.Thread(target=KpiWriter.kafka_listener, args=()).start() @staticmethod Loading Loading @@ -67,21 +72,30 @@ class KpiWriter: print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail: {str(e)}") continue @staticmethod # send metric to Prometheus @staticmethod def write_metric_to_promtheus(event): print("New recevied event: {:}".format(event)) # # create Prometheus metrics # for metric_key in KPIs_TO_SEARCH: # metric_name = metric_key # metric_description = "description of " + str(metric_key) # metric_tags = "tags of " + str(metric_key) # PROM_METRICS[metric_key] = Gauge( metric_name, metric_description,metric_tags ) # NN_REC_PKTS_TOTAL = PROM_METRICS["node_network_receive_packets_total"] # NN_REC_BYTS_TOTAL = PROM_METRICS["node_network_receive_bytes_total"] # NN_TRSMT_BYTS_TOTAL = PROM_METRICS["node_network_transmit_bytes_total"] # PROC_OPEN_FDs = PROM_METRICS["process_open_fds"] event = ast.literal_eval(event) # converted into dict print("New recevied event: {:}".format(event['kpi_description'])) event_kpi_name = event['kpi_description'] if event_kpi_name in KPIs_TO_SEARCH: PROM_METRICS[event_kpi_name].labels( tag1 = "test tag value", tag2 = "test tag value" ).set(event['kpi_value']) time.sleep(0.05) @staticmethod def create_prom_metrics_name(): metric_tags = ["tag1", "tag2"] for metric_key in KPIs_TO_SEARCH: metric_name = metric_key metric_description = "description of " + str(metric_key) try: PROM_METRICS[metric_key] = Gauge ( metric_name, metric_description, metric_tags, registry=KAFKA_REGISTERY ) print("Metric pushed to Prometheus: {:}".format(PROM_METRICS[metric_key])) except ValueError as e: if 'Duplicated timeseries' in str(e): print("Metric {:} is already registered. Skipping.".format(metric_name)) Loading
src/kpi_manager/service/KpiWriter.py +31 −17 Original line number Diff line number Diff line Loading @@ -14,9 +14,11 @@ # read Kafka stream from Kafka topic import ast import time import threading from confluent_kafka import KafkaError from prometheus_client import start_http_server, Gauge from prometheus_client import start_http_server, Gauge, CollectorRegistry from confluent_kafka import Consumer as KafkaConsumer KAFKA_SERVER_IP = '127.0.0.1:9092' Loading @@ -30,7 +32,7 @@ KPIs_TO_SEARCH = ["node_network_receive_packets_total", "node_network_transmit_bytes_total", "process_open_fds"] PROM_METRICS = {} KAFKA_REGISTERY = CollectorRegistry() class KpiWriter: def __init__(self) -> None: Loading @@ -38,6 +40,9 @@ class KpiWriter: @staticmethod def kpi_writer(): # Start up the server to expose the metrics at port number mention below. start_http_server(8101) KpiWriter.create_prom_metrics_name() threading.Thread(target=KpiWriter.kafka_listener, args=()).start() @staticmethod Loading Loading @@ -67,21 +72,30 @@ class KpiWriter: print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail: {str(e)}") continue @staticmethod # send metric to Prometheus @staticmethod def write_metric_to_promtheus(event): print("New recevied event: {:}".format(event)) # # create Prometheus metrics # for metric_key in KPIs_TO_SEARCH: # metric_name = metric_key # metric_description = "description of " + str(metric_key) # metric_tags = "tags of " + str(metric_key) # PROM_METRICS[metric_key] = Gauge( metric_name, metric_description,metric_tags ) # NN_REC_PKTS_TOTAL = PROM_METRICS["node_network_receive_packets_total"] # NN_REC_BYTS_TOTAL = PROM_METRICS["node_network_receive_bytes_total"] # NN_TRSMT_BYTS_TOTAL = PROM_METRICS["node_network_transmit_bytes_total"] # PROC_OPEN_FDs = PROM_METRICS["process_open_fds"] event = ast.literal_eval(event) # converted into dict print("New recevied event: {:}".format(event['kpi_description'])) event_kpi_name = event['kpi_description'] if event_kpi_name in KPIs_TO_SEARCH: PROM_METRICS[event_kpi_name].labels( tag1 = "test tag value", tag2 = "test tag value" ).set(event['kpi_value']) time.sleep(0.05) @staticmethod def create_prom_metrics_name(): metric_tags = ["tag1", "tag2"] for metric_key in KPIs_TO_SEARCH: metric_name = metric_key metric_description = "description of " + str(metric_key) try: PROM_METRICS[metric_key] = Gauge ( metric_name, metric_description, metric_tags, registry=KAFKA_REGISTERY ) print("Metric pushed to Prometheus: {:}".format(PROM_METRICS[metric_key])) except ValueError as e: if 'Duplicated timeseries' in str(e): print("Metric {:} is already registered. Skipping.".format(metric_name))