Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# read Kafka stream from Kafka topic
import ast
import time
import threading
import logging
from prometheus_client import start_http_server, Gauge, CollectorRegistry
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor
LOGGER = logging.getLogger(__name__)
PROM_METRICS = {}
PROM_REGISTERY = CollectorRegistry()
class MetricWriterToPrometheus:
'''
This class exposes the *cooked KPI* on the endpoint to be scraped by the Prometheus server.
cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
'''
def __init__(self):
# prometheus server address and configs
self.start_prometheus_client()
pass
def start_prometheus_client(self):
start_http_server(10808, registry=PROM_REGISTERY)
LOGGER.debug("Prometheus client is started on port 10808")
def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes
cooked_kpi = {
'kpi_id' : kpi_descriptor.kpi_id.kpi_id.uuid,
'kpi_description': kpi_descriptor.kpi_description,
'kpi_sample_type': KpiSampleType.Name(kpi_descriptor.kpi_sample_type),
'device_id' : kpi_descriptor.device_id.device_uuid.uuid,
'endpoint_id' : kpi_descriptor.endpoint_id.endpoint_uuid.uuid,
'service_id' : kpi_descriptor.service_id.service_uuid.uuid,
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp,
'kpi_value' : kpi_value.kpi_value_type.floatVal
}
# LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude]
metric_name = cooked_kpi['kpi_sample_type']
try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge (
metric_name,
cooked_kpi['kpi_description'],
metric_tags,
registry=PROM_REGISTERY
)
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
PROM_METRICS[metric_name].labels(
kpi_id = cooked_kpi['kpi_id'],
device_id = cooked_kpi['device_id'],
endpoint_id = cooked_kpi['endpoint_id'],
service_id = cooked_kpi['service_id'],
slice_id = cooked_kpi['slice_id'],
connection_id = cooked_kpi['connection_id'],
link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value']))
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
except ValueError as e:
if 'Duplicated timeseries' in str(e):
LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name))
print("Metric {:} is already registered. Skipping.".format(metric_name))
else:
LOGGER.error("Error while pushing metric: {}".format(e))
raise