Skip to content
Snippets Groups Projects
MetricWriterToPrometheus.py 4.61 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Waleed Akbar's avatar
Waleed Akbar committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# read Kafka stream from Kafka topic

import ast
import time
import threading
import logging
from prometheus_client import start_http_server, Gauge, CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType

from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor

LOGGER         = logging.getLogger(__name__)
PROM_METRICS   = {}
PROM_REGISTERY = CollectorRegistry()

class MetricWriterToPrometheus:
    '''
    This class exposes the *cooked KPI* on the endpoint to be scraped by the Prometheus server.
    cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
    '''
    def __init__(self):
        # prometheus server address and configs
        self.start_prometheus_client()
        pass
    
    def start_prometheus_client(self):
        start_http_server(10808, registry=PROM_REGISTERY)
        LOGGER.debug("Prometheus client is started on port 10808")

    def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
            # Creating a dictionary from the kpi_descriptor's attributes
            cooked_kpi = {
                'kpi_id'         : kpi_descriptor.kpi_id.kpi_id.uuid,
                'kpi_description': kpi_descriptor.kpi_description,
                'kpi_sample_type': KpiSampleType.Name(kpi_descriptor.kpi_sample_type),
                'device_id'      : kpi_descriptor.device_id.device_uuid.uuid,
                'endpoint_id'    : kpi_descriptor.endpoint_id.endpoint_uuid.uuid,
                'service_id'     : kpi_descriptor.service_id.service_uuid.uuid,
                'slice_id'       : kpi_descriptor.slice_id.slice_uuid.uuid,
                'connection_id'  : kpi_descriptor.connection_id.connection_uuid.uuid,
                'link_id'        : kpi_descriptor.link_id.link_uuid.uuid,
                'time_stamp'      : kpi_value.timestamp.timestamp,
                'kpi_value'      : kpi_value.kpi_value_type.floatVal
            }
            # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
            return cooked_kpi

    def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
        # merge both gRPC messages into single varible.
        cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
        tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag
        metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude]
        metric_name = cooked_kpi['kpi_sample_type']
        try:
            if metric_name not in PROM_METRICS:     # Only register the metric, when it doesn't exists
                PROM_METRICS[metric_name] = Gauge ( 
                    metric_name,
                    cooked_kpi['kpi_description'],
                    metric_tags,
                    registry=PROM_REGISTERY
                )
            LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
            PROM_METRICS[metric_name].labels(
                    kpi_id          = cooked_kpi['kpi_id'],
                    device_id       = cooked_kpi['device_id'],
                    endpoint_id     = cooked_kpi['endpoint_id'],
                    service_id      = cooked_kpi['service_id'],
                    slice_id        = cooked_kpi['slice_id'],
                    connection_id   = cooked_kpi['connection_id'],
                    link_id         = cooked_kpi['link_id'],
                    time_stamp      = cooked_kpi['time_stamp'],
                ).set(float(cooked_kpi['kpi_value']))
            LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))

        except ValueError as e:
            if 'Duplicated timeseries' in str(e):
                LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name))
                print("Metric {:} is already registered. Skipping.".format(metric_name))
            else:
                LOGGER.error("Error while pushing metric: {}".format(e))
                raise