Commit 9f94eb55 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

kpi_writer_to_prom added

parent 2f5b4215
Loading
Loading
Loading
Loading
+6 −7
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import logging
from kpi_manager.service.KpiWriter import KpiWriter

LOGGER = logging.getLogger(__name__)
PROJECTDIR=`pwd`

def test_kpi_writer():
    LOGGER.info(' >>> test_kpi_writer START <<< ')
    KpiWriter.kpi_writer()
cd $PROJECTDIR/src

RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
    kpi_value_writer/tests/test_metric_writer_to_prom.py
 No newline at end of file
+34 −14
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

aniso8601==9.0.1
anytree==2.8.0
APScheduler==3.10.1
attrs==23.2.0
bcrypt==4.1.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==2.0.12
click==8.1.7
colorama==0.4.6
confluent-kafka==2.3.0
coverage==6.3
cryptography==36.0.2
deepdiff==6.7.1
deepmerge==1.1.1
enum34==1.1.10
Flask==2.1.3
Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9
future-fstrings==1.2.0
googleapis-common-protos==1.63.2
greenlet==3.0.3
grpcio==1.47.5
grpcio-health-checking==1.47.5
@@ -32,10 +30,22 @@ hyperframe==6.0.1
idna==3.7
influx-line-protocol==0.1.4
iniconfig==2.0.0
ipaddress==1.0.23
itsdangerous==2.2.0
Jinja2==3.0.3
kafka-python==2.0.2
lxml==5.2.2
macaddress==2.0.2
MarkupSafe==2.1.5
multidict==6.0.5
ncclient==0.6.15
networkx==3.3
numpy==2.0.0
ordered-set==4.1.0
p4runtime==1.3.0
packaging==24.0
pandas==1.5.3
paramiko==2.9.2
pluggy==1.5.0
prettytable==3.5.0
prometheus-client==0.13.0
@@ -43,6 +53,10 @@ protobuf==3.20.3
psycopg2-binary==2.9.3
py==1.11.0
py-cpuinfo==9.0.0
pyang==2.6.0
pyangbind @ git+https://github.com/robshakir/pyangbind.git@daf530f882c14bdb1bae4dc94fb4b4ad04d1295c
pycparser==2.22
PyNaCl==1.5.0
pytest==6.2.5
pytest-benchmark==3.4.1
pytest-depends==1.0.1
@@ -50,14 +64,20 @@ python-dateutil==2.8.2
python-json-logger==2.0.2
pytz==2024.1
questdb==1.0.1
regex==2024.5.15
requests==2.27.1
requests-mock==1.9.3
six==1.16.0
SQLAlchemy==1.4.52
sqlalchemy-cockroachdb==1.4.4
SQLAlchemy-Utils==0.38.3
tabulate==0.9.0
toml==0.10.2
typing_extensions==4.12.0
tzlocal==5.2
urllib3==1.26.18
wcwidth==0.2.13
websockets==10.4
Werkzeug==2.3.7
xmltodict==0.12.0
yattag==1.15.2
+96 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# read Kafka stream from Kafka topic

import ast
import time
import threading
import logging
from prometheus_client import start_http_server, Gauge, CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType

from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor

LOGGER         = logging.getLogger(__name__)
PROM_METRICS   = {}
PROM_REGISTERY = CollectorRegistry()

class MetricWriterToPrometheus:
    '''
    This class exposes the *cooked KPI* on the endpoint to be scraped by the Prometheus server.
    cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
    '''
    def __init__(self):
        # prometheus server address and configs
        self.start_prometheus_client()
        pass
    
    def start_prometheus_client(self):
        start_http_server(10808, registry=PROM_REGISTERY)
        LOGGER.debug("Prometheus client is started on port 10808")

    def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
            # Creating a dictionary from the kpi_descriptor's attributes
            cooked_kpi = {
                'kpi_id'         : kpi_descriptor.kpi_id.kpi_id.uuid,
                'kpi_description': kpi_descriptor.kpi_description,
                'kpi_sample_type': KpiSampleType.Name(kpi_descriptor.kpi_sample_type),
                'device_id'      : kpi_descriptor.device_id.device_uuid.uuid,
                'endpoint_id'    : kpi_descriptor.endpoint_id.endpoint_uuid.uuid,
                'service_id'     : kpi_descriptor.service_id.service_uuid.uuid,
                'slice_id'       : kpi_descriptor.slice_id.slice_uuid.uuid,
                'connection_id'  : kpi_descriptor.connection_id.connection_uuid.uuid,
                'link_id'        : kpi_descriptor.link_id.link_uuid.uuid,
                'time_stamp'      : kpi_value.timestamp.timestamp,
                'kpi_value'      : kpi_value.kpi_value_type.floatVal
            }
            # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
            return cooked_kpi

    def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
        # merge both gRPC messages into single varible.
        cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
        tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag
        metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude]
        metric_name = cooked_kpi['kpi_sample_type']
        try:
            if metric_name not in PROM_METRICS:     # Only register the metric, when it doesn't exists
                PROM_METRICS[metric_name] = Gauge ( 
                    metric_name,
                    cooked_kpi['kpi_description'],
                    metric_tags,
                    registry=PROM_REGISTERY
                )
            LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
            PROM_METRICS[metric_name].labels(
                    kpi_id          = cooked_kpi['kpi_id'],
                    device_id       = cooked_kpi['device_id'],
                    endpoint_id     = cooked_kpi['endpoint_id'],
                    service_id      = cooked_kpi['service_id'],
                    slice_id        = cooked_kpi['slice_id'],
                    connection_id   = cooked_kpi['connection_id'],
                    link_id         = cooked_kpi['link_id'],
                    time_stamp      = cooked_kpi['time_stamp'],
                ).set(float(cooked_kpi['kpi_value']))
            LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))

        except ValueError as e:
            if 'Duplicated timeseries' in str(e):
                LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name))
                print("Metric {:} is already registered. Skipping.".format(metric_name))
            else:
                LOGGER.error("Error while pushing metric: {}".format(e))
                raise
 No newline at end of file
+6 −2
Original line number Diff line number Diff line
@@ -19,12 +19,16 @@ from common.tools.kafka.Variables import KafkaTopic

LOGGER = logging.getLogger(__name__)


# -------- Initial Test ----------------
# def test_validate_kafka_topics():
#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
#     response = KafkaTopic.create_all_topics()
#     assert isinstance(response, bool)

def test_KafkaConsumer():
    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
    LOGGER.debug(" --->>> test_validate_kafka_topics: START <<<--- ")
    KpiValueWriter.RunKafkaConsumer()

def test_metric_composer_and_writer():
    LOGGER.debug(" --->>> test_metric_composer_and_writer: START <<<--- ")
Loading