diff --git a/scripts/run_tests_locally-kpi-composer.sh b/scripts/run_tests_locally-kpi-composer.sh new file mode 100755 index 0000000000000000000000000000000000000000..c61b257888e5d624c89b49cc4653b618e5f2c0b7 --- /dev/null +++ b/scripts/run_tests_locally-kpi-composer.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ + kpi_manager/tests/test_kpi_composer.py \ No newline at end of file diff --git a/scripts/run_tests_locally-kpi_manager.sh b/scripts/run_tests_locally-kpi-manager.sh similarity index 100% rename from scripts/run_tests_locally-kpi_manager.sh rename to scripts/run_tests_locally-kpi-manager.sh diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py new file mode 100644 index 0000000000000000000000000000000000000000..2710aac81aa0d8084651221bd23b9320d1329a92 --- /dev/null +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -0,0 +1,86 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# read Kafka stream from Kafka topic + +import re +import logging +import threading +from confluent_kafka import KafkaError +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer + +LOGGER = logging.getLogger(__name__) +KAFKA_SERVER_IP = '127.0.0.1:9092' +# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) +KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', + 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} +PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} +CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'kpi_composer', + 'auto.offset.reset' : 'latest'} +KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"] + +class KpiValueComposer: + def __init__(self) -> None: + pass + + @staticmethod + def compose_kpi(): + KpiValueComposer.run_kafka_listener() + + @staticmethod + def run_kafka_listener(): + threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() + + @staticmethod + def kafka_listener(): + """ + listener for events on Kafka topic. + """ + kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) + kafka_consumer.subscribe([KAFKA_TOPICS['raw']]) + while True: + receive_msg = kafka_consumer.poll(2.0) + if receive_msg is None: + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + continue + try: + new_event = receive_msg.value().decode('utf-8') + # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + KpiValueComposer.extract_kpi_values(new_event) + except Exception as e: + print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") + continue + + @staticmethod + def extract_kpi_values(event): + pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) + lines = event.split('\n') + matching_rows = [] + for line in lines: + if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"): + matching_rows.append(tuple(line.split(" "))) + print("Extracted Rows that match the KPIs {:}".format(matching_rows)) + # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows)) + return matching_rows + + diff --git a/src/kpi_manager/tests/test_kpi_composer.py b/src/kpi_manager/tests/test_kpi_composer.py new file mode 100644 index 0000000000000000000000000000000000000000..a4312ea53e0a5d7c76f2f87e90acb2e34029e5bd --- /dev/null +++ b/src/kpi_manager/tests/test_kpi_composer.py @@ -0,0 +1,23 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import logging +from kpi_manager.service.KpiValueComposer import KpiValueComposer + +LOGGER = logging.getLogger(__name__) + +def test_compose_kpi(): + LOGGER.info(' >>> test_compose_kpi START <<< ') + KpiValueComposer.compose_kpi() \ No newline at end of file diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2ce8ebf70005f1f8644e9b4f30e121cc1d5f9a3e..4cfee8dba534fef4ae9b12599d332cc0a495a54c 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -1,4 +1,3 @@ - # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index bc64c473e88428314d357070b99c7b2d9164930e..7c3b7497bf2b94ba350a0b193fbf76e49a67bcec 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -43,10 +43,9 @@ LOGGER = logging.getLogger(__name__) # LOGGER.debug(str(response)) # assert isinstance(response, bool) - -def test_fetch_node_exporter_metrics(): - LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') - TelemetryBackendService.fetch_single_node_exporter_metric() +# def test_fetch_node_exporter_metrics(): +# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') +# TelemetryBackendService.fetch_single_node_exporter_metric() def test_stream_node_export_metrics_to_raw_topic(): LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 2fab04b31c0826bfa4f379b3de969850d2a9d3d9..d10e9dffd1dab1b43eba5382e755ab82d011a172 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -114,42 +114,42 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): return True def kafka_listener(self): - """ - listener for response on Kafka topic. - """ - # # print ("--- STARTED: kafka_listener ---") - # conusmer_configs = { - # 'bootstrap.servers' : KAFKA_SERVER_IP, - # 'group.id' : 'frontend', - # 'auto.offset.reset' : 'latest' - # } - # # topic_response = "topic_response" - - # consumerObj = KafkaConsumer(conusmer_configs) - self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) - # print (time.time()) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print("Consumer error: {}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_id, kpi_value) - else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - except Exception as e: - print(f"No message key found: {str(e)}") + """ + listener for response on Kafka topic. + """ + # # print ("--- STARTED: kafka_listener ---") + # conusmer_configs = { + # 'bootstrap.servers' : KAFKA_SERVER_IP, + # 'group.id' : 'frontend', + # 'auto.offset.reset' : 'latest' + # } + # # topic_response = "topic_response" + + # consumerObj = KafkaConsumer(conusmer_configs) + self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) + # print (time.time()) + while True: + receive_msg = self.kafka_consumer.poll(2.0) + if receive_msg is None: + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue - # return None + else: + print("Consumer error: {}".format(receive_msg.error())) + break + try: + collector_id = receive_msg.key().decode('utf-8') + if collector_id in ACTIVE_COLLECTORS: + (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.process_response(collector_id, kpi_id, kpi_value) + else: + print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + except Exception as e: + print(f"No message key found: {str(e)}") + continue + # return None def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: