Skip to content
Snippets Groups Projects
Commit d83d0b4e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/177-multiple-fixes-on-kpi-management' into 'develop'

Resolve "Multiple fixes on KPI Management"

See merge request !254
parents ed989581 c5e19713
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!254Resolve "Multiple fixes on KPI Management"
Showing with 41 additions and 143 deletions
...@@ -45,6 +45,9 @@ include: ...@@ -45,6 +45,9 @@ include:
#- local: '/src/dlt/.gitlab-ci.yml' #- local: '/src/dlt/.gitlab-ci.yml'
- local: '/src/load_generator/.gitlab-ci.yml' - local: '/src/load_generator/.gitlab-ci.yml'
- local: '/src/bgpls_speaker/.gitlab-ci.yml' - local: '/src/bgpls_speaker/.gitlab-ci.yml'
- local: '/src/kpi_manager/.gitlab-ci.yml'
- local: '/src/kpi_value_api/.gitlab-ci.yml'
- local: '/src/kpi_value_writer/.gitlab-ci.yml'
# This should be last one: end-to-end integration tests # This should be last one: end-to-end integration tests
- local: '/src/tests/.gitlab-ci.yml' - local: '/src/tests/.gitlab-ci.yml'
...@@ -340,7 +340,7 @@ for COMPONENT in $TFS_COMPONENTS; do ...@@ -340,7 +340,7 @@ for COMPONENT in $TFS_COMPONENTS; do
echo " Deploying '$COMPONENT' component to Kubernetes..." echo " Deploying '$COMPONENT' component to Kubernetes..."
DEPLOY_LOG="$TMP_LOGS_FOLDER/deploy_${COMPONENT}.log" DEPLOY_LOG="$TMP_LOGS_FOLDER/deploy_${COMPONENT}.log"
kubectl --namespace $TFS_K8S_NAMESPACE apply -f "$MANIFEST" > "$DEPLOY_LOG" kubectl --namespace $TFS_K8S_NAMESPACE apply -f "$MANIFEST" > "$DEPLOY_LOG"
COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/g")
#kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=0 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" #kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=0 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG"
#kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=1 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" #kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=1 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG"
...@@ -391,7 +391,7 @@ printf "\n" ...@@ -391,7 +391,7 @@ printf "\n"
for COMPONENT in $TFS_COMPONENTS; do for COMPONENT in $TFS_COMPONENTS; do
echo "Waiting for '$COMPONENT' component..." echo "Waiting for '$COMPONENT' component..."
COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/g")
kubectl wait --namespace $TFS_K8S_NAMESPACE \ kubectl wait --namespace $TFS_K8S_NAMESPACE \
--for='condition=available' --timeout=90s deployment/${COMPONENT_OBJNAME}service --for='condition=available' --timeout=90s deployment/${COMPONENT_OBJNAME}service
WAIT_EXIT_CODE=$? WAIT_EXIT_CODE=$?
......
...@@ -24,5 +24,7 @@ cd $PROJECTDIR/src ...@@ -24,5 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_db.py kpi_manager/tests/test_kpi_db.py
...@@ -24,5 +24,7 @@ cd $PROJECTDIR/src ...@@ -24,5 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_manager.py kpi_manager/tests/test_kpi_manager.py
...@@ -19,5 +19,7 @@ PROJECTDIR=`pwd` ...@@ -19,5 +19,7 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_value_writer/tests/test_metric_writer_to_prom.py kpi_value_writer/tests/test_metric_writer_to_prom.py
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# Build, tag, and push the Docker image to the GitLab Docker registry # Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-manager: build kpi-manager:
variables: variables:
IMAGE_NAME: 'kpi-manager' # name of the microservice IMAGE_NAME: 'kpi_manager' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build stage: build
before_script: before_script:
...@@ -41,7 +41,7 @@ build kpi-manager: ...@@ -41,7 +41,7 @@ build kpi-manager:
# Apply unit test to the component # Apply unit test to the component
unit_test kpi-manager: unit_test kpi-manager:
variables: variables:
IMAGE_NAME: 'kpi-manager' # name of the microservice IMAGE_NAME: 'kpi_manager' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test stage: unit_test
needs: needs:
...@@ -68,8 +68,6 @@ unit_test kpi-manager: ...@@ -68,8 +68,6 @@ unit_test kpi-manager:
- docker ps -a - docker ps -a
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") - CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS - echo $CRDB_ADDRESS
- NATS_ADDRESS=$(docker inspect nats --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $NATS_ADDRESS
- > - >
docker run --name $IMAGE_NAME -d -p 30010:30010 docker run --name $IMAGE_NAME -d -p 30010:30010
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require" --env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
......
...@@ -27,11 +27,11 @@ class KpiEngine: ...@@ -27,11 +27,11 @@ class KpiEngine:
if crdb_uri is None: if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = get_setting('CRDB_DATABASE') CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_USERNAME = get_setting('CRDB_USERNAME') CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD') CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE') CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format( crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# crdb_uri = CRDB_URI_TEMPLATE.format( # crdb_uri = CRDB_URI_TEMPLATE.format(
# CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) # CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
......
...@@ -18,10 +18,10 @@ from sqlalchemy.orm import sessionmaker ...@@ -18,10 +18,10 @@ from sqlalchemy.orm import sessionmaker
from kpi_manager.database.KpiEngine import KpiEngine from kpi_manager.database.KpiEngine import KpiEngine
from kpi_manager.database.KpiModel import Kpi as KpiModel from kpi_manager.database.KpiModel import Kpi as KpiModel
from common.method_wrappers.ServiceExceptions import ( from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, OperationFailedException) AlreadyExistsException, OperationFailedException , NotFoundException)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
DB_NAME = "kpi" DB_NAME = "tfs_kpi_mgmt"
class KpiDB: class KpiDB:
def __init__(self): def __init__(self):
...@@ -86,7 +86,7 @@ class KpiDB: ...@@ -86,7 +86,7 @@ class KpiDB:
return entity return entity
else: else:
LOGGER.debug(f"{model.__name__} ID not found: {str(id_to_search)}") LOGGER.debug(f"{model.__name__} ID not found: {str(id_to_search)}")
return None raise NotFoundException (model.__name__, id_to_search, extra_details=["Row not found with ID"] )
except Exception as e: except Exception as e:
session.rollback() session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
......
...@@ -52,13 +52,8 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): ...@@ -52,13 +52,8 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer):
try: try:
kpi_id_to_search = request.kpi_id.uuid kpi_id_to_search = request.kpi_id.uuid
row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
if row is not None: response = KpiModel.convert_row_to_KpiDescriptor(row)
response = KpiModel.convert_row_to_KpiDescriptor(row) return response
return response
if row is None:
print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search))
LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search))
return Empty()
except Exception as e: except Exception as e:
print ('Unable to search kpi id. {:}'.format(e)) print ('Unable to search kpi id. {:}'.format(e))
LOGGER.info('Unable to search kpi id. {:}'.format(e)) LOGGER.info('Unable to search kpi id. {:}'.format(e))
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# Build, tag, and push the Docker image to the GitLab Docker registry # Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-value-api: build kpi-value-api:
variables: variables:
IMAGE_NAME: 'kpi-value-api' # name of the microservice IMAGE_NAME: 'kpi_value_api' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build stage: build
before_script: before_script:
...@@ -41,7 +41,7 @@ build kpi-value-api: ...@@ -41,7 +41,7 @@ build kpi-value-api:
# Apply unit test to the component # Apply unit test to the component
unit_test kpi-value-api: unit_test kpi-value-api:
variables: variables:
IMAGE_NAME: 'kpi-value-api' # name of the microservice IMAGE_NAME: 'kpi_value_api' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test stage: unit_test
needs: needs:
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level from common.Settings import get_log_level
from .KpiValueApiService import KpiValueApiService from .KpiValueApiService import KpiValueApiService
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# Build, tag, and push the Docker image to the GitLab Docker registry # Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-value-writer: build kpi-value-writer:
variables: variables:
IMAGE_NAME: 'kpi-value-writer' # name of the microservice IMAGE_NAME: 'kpi_value_writer' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build stage: build
before_script: before_script:
...@@ -41,7 +41,7 @@ build kpi-value-writer: ...@@ -41,7 +41,7 @@ build kpi-value-writer:
# Apply unit test to the component # Apply unit test to the component
unit_test kpi-value-writer: unit_test kpi-value-writer:
variables: variables:
IMAGE_NAME: 'kpi-value-writer' # name of the microservice IMAGE_NAME: 'kpi_value_writer' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test stage: unit_test
needs: needs:
......
...@@ -17,20 +17,29 @@ import threading ...@@ -17,20 +17,29 @@ import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.Settings import get_service_port_grpc
from common.Constants import ServiceNameEnum
from common.tools.service.GenericGrpcService import GenericGrpcService
from confluent_kafka import KafkaError from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_manager.client.KpiManagerClient import KpiManagerClient
# -- test import -- # -- test import --
from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request # from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request
from .MetricWriterToPrometheus import MetricWriterToPrometheus from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = [] ACTIVE_CONSUMERS = []
METRIC_WRITER = MetricWriterToPrometheus()
class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER)
super().__init__(port, cls_name=cls_name)
class KpiValueWriter:
@staticmethod @staticmethod
def RunKafkaConsumer(): def RunKafkaConsumer():
thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=())
...@@ -44,11 +53,7 @@ class KpiValueWriter: ...@@ -44,11 +53,7 @@ class KpiValueWriter:
'group.id' : __class__, 'group.id' : __class__,
'auto.offset.reset' : 'latest'} 'auto.offset.reset' : 'latest'}
) )
metric_writer_to_prom = MetricWriterToPrometheus()
kpi_manager_client = KpiManagerClient() kpi_manager_client = KpiManagerClient()
print("Kpi manger client created: {:}".format(kpi_manager_client))
kafka_consumer.subscribe([KafkaTopic.VALUE.value]) kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
...@@ -84,15 +89,15 @@ class KpiValueWriter: ...@@ -84,15 +89,15 @@ class KpiValueWriter:
try: try:
kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
print("kpi descriptor received: {:}".format(kpi_descriptor_object)) # print("kpi descriptor received: {:}".format(kpi_descriptor_object))
if isinstance (kpi_descriptor_object, KpiDescriptor): # if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
MetricWriterToPrometheus.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else: else:
LOGGER.info("Error in extracting KpiDescriptor {:}".format(kpi_descriptor_object)) LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("Error in extracting KpiDescriptor {:}".format(kpi_descriptor_object)) print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
except Exception as e: except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e)) print ("Unable to get KpiDescriptor. Error: {:}".format(e))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# read Kafka stream from Kafka topic
import ast
import time
import threading
from confluent_kafka import KafkaError
from prometheus_client import start_http_server, Gauge, CollectorRegistry
from confluent_kafka import Consumer as KafkaConsumer
KAFKA_SERVER_IP = '127.0.0.1:9092'
KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response',
'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'}
CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'kpi_writer',
'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH = ["node_network_receive_packets_total",
"node_network_receive_bytes_total",
"node_network_transmit_bytes_total",
"process_open_fds"]
PROM_METRICS = {}
KAFKA_REGISTERY = CollectorRegistry()
class KpiWriter:
def __init__(self) -> None:
pass
@staticmethod
def kpi_writer():
KpiWriter.create_prom_metrics_name()
threading.Thread(target=KpiWriter.kafka_listener, args=()).start()
@staticmethod
def kafka_listener():
"""
listener for events on Kafka topic.
"""
# Start up the server to expose the metrics at port number mention below.
start_http_server(8101, registry=KAFKA_REGISTERY)
kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
kafka_consumer.subscribe([KAFKA_TOPICS['labeled']])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None:
# print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
continue
try:
new_event = receive_msg.value().decode('utf-8')
# print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
# LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
KpiWriter.write_metric_to_promtheus(new_event)
except Exception as e:
print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail: {str(e)}")
continue
# send metric to Prometheus
@staticmethod
def write_metric_to_promtheus(event):
event = ast.literal_eval(event) # converted into dict
print("New recevied event: {:}".format(event))
event_kpi_name = event['kpi_description']
if event_kpi_name in KPIs_TO_SEARCH:
PROM_METRICS[event_kpi_name].labels(
kpi_id = event['kpi_id'],
kpi_sample_type = event['kpi_sample_type'],
device_id = event['device_id'],
endpoint_id = event['endpoint_id'],
service_id = event['service_id'],
slice_id = event['slice_id'],
connection_id = event['connection_id'],
link_id = event['link_id']
).set(float(event['kpi_value']))
time.sleep(0.05)
@staticmethod
def create_prom_metrics_name():
metric_tags = ['kpi_id','kpi_sample_type','device_id',
'endpoint_id','service_id','slice_id','connection_id','link_id']
for metric_key in KPIs_TO_SEARCH:
metric_name = metric_key
metric_description = "description of " + str(metric_key)
try:
PROM_METRICS[metric_key] = Gauge (
metric_name, metric_description, metric_tags,
registry=KAFKA_REGISTERY )
# print("Metric pushed to Prometheus: {:}".format(PROM_METRICS[metric_key]))
except ValueError as e:
if 'Duplicated timeseries' in str(e):
print("Metric {:} is already registered. Skipping.".format(metric_name))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment