From a4a63b5d81994262b29bba7e6c9e20ae0094656d Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Mon, 3 Jun 2024 12:10:32 +0000 Subject: [PATCH] KpiComposer: Read Kafka stream and extracts the metric value as per configured KPIs. --- scripts/run_tests_locally-kpi-composer.sh | 23 +++++ ...er.sh => run_tests_locally-kpi-manager.sh} | 0 src/kpi_manager/service/KpiValueComposer.py | 86 +++++++++++++++++++ src/kpi_manager/tests/test_kpi_composer.py | 23 +++++ .../service/TelemetryBackendService.py | 1 - .../backend/tests/testTelemetryBackend.py | 7 +- .../TelemetryFrontendServiceServicerImpl.py | 70 +++++++-------- 7 files changed, 170 insertions(+), 40 deletions(-) create mode 100755 scripts/run_tests_locally-kpi-composer.sh rename scripts/{run_tests_locally-kpi_manager.sh => run_tests_locally-kpi-manager.sh} (100%) create mode 100644 src/kpi_manager/service/KpiValueComposer.py create mode 100644 src/kpi_manager/tests/test_kpi_composer.py diff --git a/scripts/run_tests_locally-kpi-composer.sh b/scripts/run_tests_locally-kpi-composer.sh new file mode 100755 index 000000000..c61b25788 --- /dev/null +++ b/scripts/run_tests_locally-kpi-composer.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ + kpi_manager/tests/test_kpi_composer.py \ No newline at end of file diff --git a/scripts/run_tests_locally-kpi_manager.sh b/scripts/run_tests_locally-kpi-manager.sh similarity index 100% rename from scripts/run_tests_locally-kpi_manager.sh rename to scripts/run_tests_locally-kpi-manager.sh diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py new file mode 100644 index 000000000..2710aac81 --- /dev/null +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -0,0 +1,86 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# read Kafka stream from Kafka topic + +import re +import logging +import threading +from confluent_kafka import KafkaError +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer + +LOGGER = logging.getLogger(__name__) +KAFKA_SERVER_IP = '127.0.0.1:9092' +# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) +KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', + 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} +PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} +CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'kpi_composer', + 'auto.offset.reset' : 'latest'} +KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"] + +class KpiValueComposer: + def __init__(self) -> None: + pass + + @staticmethod + def compose_kpi(): + KpiValueComposer.run_kafka_listener() + + @staticmethod + def run_kafka_listener(): + threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() + + @staticmethod + def kafka_listener(): + """ + listener for events on Kafka topic. + """ + kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) + kafka_consumer.subscribe([KAFKA_TOPICS['raw']]) + while True: + receive_msg = kafka_consumer.poll(2.0) + if receive_msg is None: + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + continue + try: + new_event = receive_msg.value().decode('utf-8') + # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + KpiValueComposer.extract_kpi_values(new_event) + except Exception as e: + print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") + continue + + @staticmethod + def extract_kpi_values(event): + pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) + lines = event.split('\n') + matching_rows = [] + for line in lines: + if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"): + matching_rows.append(tuple(line.split(" "))) + print("Extracted Rows that match the KPIs {:}".format(matching_rows)) + # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows)) + return matching_rows + + diff --git a/src/kpi_manager/tests/test_kpi_composer.py b/src/kpi_manager/tests/test_kpi_composer.py new file mode 100644 index 000000000..a4312ea53 --- /dev/null +++ b/src/kpi_manager/tests/test_kpi_composer.py @@ -0,0 +1,23 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import logging +from kpi_manager.service.KpiValueComposer import KpiValueComposer + +LOGGER = logging.getLogger(__name__) + +def test_compose_kpi(): + LOGGER.info(' >>> test_compose_kpi START <<< ') + KpiValueComposer.compose_kpi() \ No newline at end of file diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2ce8ebf70..4cfee8dba 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -1,4 +1,3 @@ - # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index bc64c473e..7c3b7497b 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -43,10 +43,9 @@ LOGGER = logging.getLogger(__name__) # LOGGER.debug(str(response)) # assert isinstance(response, bool) - -def test_fetch_node_exporter_metrics(): - LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') - TelemetryBackendService.fetch_single_node_exporter_metric() +# def test_fetch_node_exporter_metrics(): +# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') +# TelemetryBackendService.fetch_single_node_exporter_metric() def test_stream_node_export_metrics_to_raw_topic(): LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 2fab04b31..d10e9dffd 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -114,42 +114,42 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): return True def kafka_listener(self): - """ - listener for response on Kafka topic. - """ - # # print ("--- STARTED: kafka_listener ---") - # conusmer_configs = { - # 'bootstrap.servers' : KAFKA_SERVER_IP, - # 'group.id' : 'frontend', - # 'auto.offset.reset' : 'latest' - # } - # # topic_response = "topic_response" - - # consumerObj = KafkaConsumer(conusmer_configs) - self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) - # print (time.time()) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print("Consumer error: {}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_id, kpi_value) - else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - except Exception as e: - print(f"No message key found: {str(e)}") + """ + listener for response on Kafka topic. + """ + # # print ("--- STARTED: kafka_listener ---") + # conusmer_configs = { + # 'bootstrap.servers' : KAFKA_SERVER_IP, + # 'group.id' : 'frontend', + # 'auto.offset.reset' : 'latest' + # } + # # topic_response = "topic_response" + + # consumerObj = KafkaConsumer(conusmer_configs) + self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) + # print (time.time()) + while True: + receive_msg = self.kafka_consumer.poll(2.0) + if receive_msg is None: + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue - # return None + else: + print("Consumer error: {}".format(receive_msg.error())) + break + try: + collector_id = receive_msg.key().decode('utf-8') + if collector_id in ACTIVE_COLLECTORS: + (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.process_response(collector_id, kpi_id, kpi_value) + else: + print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + except Exception as e: + print(f"No message key found: {str(e)}") + continue + # return None def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: -- GitLab