diff --git a/src/kpi_value_writer/tests/test_kpi_writer.py b/scripts/run_tests_locally-kpi-prom-writer.sh old mode 100644 new mode 100755 similarity index 73% rename from src/kpi_value_writer/tests/test_kpi_writer.py rename to scripts/run_tests_locally-kpi-prom-writer.sh index d2261b6addebea49770b45dfb92af4742614fb93..63989a13bd9b5200909710f62b8fdc845e779317 --- a/src/kpi_value_writer/tests/test_kpi_writer.py +++ b/scripts/run_tests_locally-kpi-prom-writer.sh @@ -1,3 +1,4 @@ +#!/bin/bash # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,13 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading -import logging -from kpi_manager.service.KpiWriter import KpiWriter -LOGGER = logging.getLogger(__name__) +PROJECTDIR=`pwd` -def test_kpi_writer(): - LOGGER.info(' >>> test_kpi_writer START <<< ') - KpiWriter.kpi_writer() +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + kpi_value_writer/tests/test_metric_writer_to_prom.py \ No newline at end of file diff --git a/src/kpi_manager/requirements.in b/src/kpi_manager/requirements.in index d96e4b1b8a028fee11bfb435d71b64a71747f483..b66e07d206d6adc612e3ac00e277528840d860fa 100644 --- a/src/kpi_manager/requirements.in +++ b/src/kpi_manager/requirements.in @@ -1,26 +1,24 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - +aniso8601==9.0.1 anytree==2.8.0 APScheduler==3.10.1 attrs==23.2.0 +bcrypt==4.1.3 certifi==2024.2.2 +cffi==1.16.0 charset-normalizer==2.0.12 +click==8.1.7 colorama==0.4.6 confluent-kafka==2.3.0 coverage==6.3 +cryptography==36.0.2 +deepdiff==6.7.1 +deepmerge==1.1.1 +enum34==1.1.10 +Flask==2.1.3 +Flask-HTTPAuth==4.5.0 +Flask-RESTful==0.3.9 future-fstrings==1.2.0 +googleapis-common-protos==1.63.2 greenlet==3.0.3 grpcio==1.47.5 grpcio-health-checking==1.47.5 @@ -32,10 +30,22 @@ hyperframe==6.0.1 idna==3.7 influx-line-protocol==0.1.4 iniconfig==2.0.0 +ipaddress==1.0.23 +itsdangerous==2.2.0 +Jinja2==3.0.3 kafka-python==2.0.2 +lxml==5.2.2 +macaddress==2.0.2 +MarkupSafe==2.1.5 multidict==6.0.5 +ncclient==0.6.15 networkx==3.3 +numpy==2.0.0 +ordered-set==4.1.0 +p4runtime==1.3.0 packaging==24.0 +pandas==1.5.3 +paramiko==2.9.2 pluggy==1.5.0 prettytable==3.5.0 prometheus-client==0.13.0 @@ -43,6 +53,10 @@ protobuf==3.20.3 psycopg2-binary==2.9.3 py==1.11.0 py-cpuinfo==9.0.0 +pyang==2.6.0 +pyangbind @ git+https://github.com/robshakir/pyangbind.git@daf530f882c14bdb1bae4dc94fb4b4ad04d1295c +pycparser==2.22 +PyNaCl==1.5.0 pytest==6.2.5 pytest-benchmark==3.4.1 pytest-depends==1.0.1 @@ -50,14 +64,20 @@ python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2024.1 questdb==1.0.1 +regex==2024.5.15 requests==2.27.1 +requests-mock==1.9.3 six==1.16.0 SQLAlchemy==1.4.52 sqlalchemy-cockroachdb==1.4.4 SQLAlchemy-Utils==0.38.3 +tabulate==0.9.0 toml==0.10.2 typing_extensions==4.12.0 tzlocal==5.2 urllib3==1.26.18 wcwidth==0.2.13 +websockets==10.4 +Werkzeug==2.3.7 xmltodict==0.12.0 +yattag==1.15.2 diff --git a/src/kpi_value_writer/service/KpiWriter.py b/src/kpi_value_writer/service/KpiWriterOld.py similarity index 100% rename from src/kpi_value_writer/service/KpiWriter.py rename to src/kpi_value_writer/service/KpiWriterOld.py diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py new file mode 100644 index 0000000000000000000000000000000000000000..b2bfc07a4a1885ede270098562a4961b73c99d1f --- /dev/null +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -0,0 +1,96 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# read Kafka stream from Kafka topic + +import ast +import time +import threading +import logging +from prometheus_client import start_http_server, Gauge, CollectorRegistry +from common.proto.kpi_sample_types_pb2 import KpiSampleType + +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.kpi_manager_pb2 import KpiDescriptor + +LOGGER = logging.getLogger(__name__) +PROM_METRICS = {} +PROM_REGISTERY = CollectorRegistry() + +class MetricWriterToPrometheus: + ''' + This class exposes the *cooked KPI* on the endpoint to be scraped by the Prometheus server. + cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) + ''' + def __init__(self): + # prometheus server address and configs + self.start_prometheus_client() + pass + + def start_prometheus_client(self): + start_http_server(10808, registry=PROM_REGISTERY) + LOGGER.debug("Prometheus client is started on port 10808") + + def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): + # Creating a dictionary from the kpi_descriptor's attributes + cooked_kpi = { + 'kpi_id' : kpi_descriptor.kpi_id.kpi_id.uuid, + 'kpi_description': kpi_descriptor.kpi_description, + 'kpi_sample_type': KpiSampleType.Name(kpi_descriptor.kpi_sample_type), + 'device_id' : kpi_descriptor.device_id.device_uuid.uuid, + 'endpoint_id' : kpi_descriptor.endpoint_id.endpoint_uuid.uuid, + 'service_id' : kpi_descriptor.service_id.service_uuid.uuid, + 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, + 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, + 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, + 'time_stamp' : kpi_value.timestamp.timestamp, + 'kpi_value' : kpi_value.kpi_value_type.floatVal + } + # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) + return cooked_kpi + + def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): + # merge both gRPC messages into single varible. + cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) + tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] + metric_name = cooked_kpi['kpi_sample_type'] + try: + if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists + PROM_METRICS[metric_name] = Gauge ( + metric_name, + cooked_kpi['kpi_description'], + metric_tags, + registry=PROM_REGISTERY + ) + LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) + PROM_METRICS[metric_name].labels( + kpi_id = cooked_kpi['kpi_id'], + device_id = cooked_kpi['device_id'], + endpoint_id = cooked_kpi['endpoint_id'], + service_id = cooked_kpi['service_id'], + slice_id = cooked_kpi['slice_id'], + connection_id = cooked_kpi['connection_id'], + link_id = cooked_kpi['link_id'], + time_stamp = cooked_kpi['time_stamp'], + ).set(float(cooked_kpi['kpi_value'])) + LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) + + except ValueError as e: + if 'Duplicated timeseries' in str(e): + LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name)) + print("Metric {:} is already registered. Skipping.".format(metric_name)) + else: + LOGGER.error("Error while pushing metric: {}".format(e)) + raise \ No newline at end of file diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index a87a0b6ea7813dc03b2f070e369293de863d3696..e528f1dbb9c527f3394876f36074ec80fdb22c1c 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -19,12 +19,16 @@ from common.tools.kafka.Variables import KafkaTopic LOGGER = logging.getLogger(__name__) + +# -------- Initial Test ---------------- # def test_validate_kafka_topics(): # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") # response = KafkaTopic.create_all_topics() # assert isinstance(response, bool) def test_KafkaConsumer(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + LOGGER.debug(" --->>> test_validate_kafka_topics: START <<<--- ") KpiValueWriter.RunKafkaConsumer() - \ No newline at end of file + +def test_metric_composer_and_writer(): + LOGGER.debug(" --->>> test_metric_composer_and_writer: START <<<--- ") diff --git a/src/kpi_value_writer/tests/test_messages.py b/src/kpi_value_writer/tests/test_messages.py index 7e59499e9332c06316b535311e4eb20e80ac7bbe..d9f4cf80a2d7043da0d49debdf01be6deba78ee2 100755 --- a/src/kpi_value_writer/tests/test_messages.py +++ b/src/kpi_value_writer/tests/test_messages.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import uuid +import uuid, time +import random from common.proto import kpi_manager_pb2 +from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\ - ConnectionId, EndPointId -# ---------------------- 3rd iteration Test Messages --------------------------------- + def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) @@ -32,128 +32,9 @@ def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member return _create_kpi_request -# ---------------------- 2nd iteration Test Messages --------------------------------- -# def create_kpi_id_request(): -# _kpi_id = kpi_manager_pb2.KpiId() -# _kpi_id.kpi_id.uuid = "34f73604-eca6-424f-9995-18b519ad0978" -# return _kpi_id - -# def create_kpi_descriptor_request_a(descriptor_name: str = "Test_name"): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) -# _create_kpi_request.kpi_description = descriptor_name -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member -# _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' # pylint: disable=maybe-no-member -# return _create_kpi_request - -# def create_kpi_descriptor_request(): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) -# _create_kpi_request.kpi_description = 'KPI Description Test' -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member -# _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member -# return _create_kpi_request - -# def create_kpi_filter_request_a(): -# _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() -# _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) - -# device_id_obj = DeviceId() -# endpoint_id_obj = EndPointId() -# service_id_obj = ServiceId() -# slice_id_obj = SliceId() -# connection_id_obj = ConnectionId() -# link_id_obj = LinkId() - -# device_id_obj.device_uuid.uuid = "DEV1" -# endpoint_id_obj.endpoint_uuid.uuid = "END1" -# service_id_obj.service_uuid.uuid = "SERV1" -# slice_id_obj.slice_uuid.uuid = "SLC1" -# connection_id_obj.connection_uuid.uuid = "CON1" -# link_id_obj.link_uuid.uuid = "LNK1" - -# _create_kpi_filter_request.device_id.append(device_id_obj) -# _create_kpi_filter_request.endpoint_id.append(endpoint_id_obj) -# _create_kpi_filter_request.service_id.append(service_id_obj) -# _create_kpi_filter_request.slice_id.append(slice_id_obj) -# _create_kpi_filter_request.connection_id.append(connection_id_obj) -# _create_kpi_filter_request.link_id.append(link_id_obj) - -# return _create_kpi_filter_request - -# -------------------- Initial Test messages ------------------------------------- - -# def create_kpi_request(kpi_id_str): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request.kpi_description = 'KPI Description Test' -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) -# return _create_kpi_request - -# def create_kpi_request_b(): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request = str(uuid.uuid4()) -# _create_kpi_request.kpi_description = 'KPI Description Test' -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member -# return _create_kpi_request - -# def create_kpi_request_c(): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request.kpi_description = 'KPI Description Test' -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member -# return _create_kpi_request - -# def create_kpi_request_d(): -# _create_kpi_request = kpi_manager_pb2.KpiDescriptor() -# _create_kpi_request.kpi_description = 'KPI Description Test' -# _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED -# _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member -# _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member -# _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member -# _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member -# _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member -# return _create_kpi_request - -# def kpi_descriptor_list(): -# _kpi_descriptor_list = kpi_manager_pb2.KpiDescriptorList() -# return _kpi_descriptor_list - -# def create_kpi_filter_request(): -# _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() -# _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) -# new_device_id = _create_kpi_filter_request.device_id.add() -# new_device_id.device_uuid.uuid = 'DEV1' -# new_service_id = _create_kpi_filter_request.service_id.add() -# new_service_id.service_uuid.uuid = 'SERV1' -# new_slice_id = _create_kpi_filter_request.slice_id.add() -# new_slice_id.slice_uuid.uuid = 'SLC1' -# new_endpoint_id = _create_kpi_filter_request.endpoint_id.add() -# new_endpoint_id.endpoint_uuid.uuid = 'END1' -# new_connection_id = _create_kpi_filter_request.connection_id.add() -# new_connection_id.connection_uuid.uuid = 'CON1' - -# return _create_kpi_filter_request \ No newline at end of file +def create_kpi_value_request(): + _create_kpi_value_request = KpiValue() + _create_kpi_value_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_value_request.timestamp.timestamp = time.time() + _create_kpi_value_request.kpi_value_type.floatVal = random.randint(10, 10000) + return _create_kpi_value_request \ No newline at end of file diff --git a/src/kpi_value_writer/tests/test_metric_writer_to_prom.py b/src/kpi_value_writer/tests/test_metric_writer_to_prom.py new file mode 100644 index 0000000000000000000000000000000000000000..cee2877fffb0abf69fe720ca778022da0e6c8794 --- /dev/null +++ b/src/kpi_value_writer/tests/test_metric_writer_to_prom.py @@ -0,0 +1,29 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import logging +from kpi_value_writer.service.MetricWriterToPrometheus import MetricWriterToPrometheus +from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request, create_kpi_value_request + +LOGGER = logging.getLogger(__name__) + +def test_metric_writer_to_prometheus(): + LOGGER.info(' >>> test_metric_writer_to_prometheus START <<< ') + metric_writer_obj = MetricWriterToPrometheus() + metric_writer_obj.create_and_expose_cooked_kpi( + create_kpi_descriptor_request(), + create_kpi_value_request() + ) +