diff --git a/scripts/run_tests_locally-kpi-manager.sh b/scripts/run_tests_locally-kpi-manager.sh index e56716dea8f5baf69fc82443559f4e01a6cb54c9..be69980e05f2b4f922a970df77f7d15b4a178fcc 100755 --- a/scripts/run_tests_locally-kpi-manager.sh +++ b/scripts/run_tests_locally-kpi-manager.sh @@ -25,4 +25,4 @@ cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ - kpi_manager/tests/test_unitary.py \ No newline at end of file + kpi_manager/tests/test_kpi_manager.py \ No newline at end of file diff --git a/scripts/run_tests_locally-kpi-writer.sh b/scripts/run_tests_locally-kpi-writer.sh new file mode 100755 index 0000000000000000000000000000000000000000..2bc2e51309108f8bf3d277dfe92402b07d3ff6a3 --- /dev/null +++ b/scripts/run_tests_locally-kpi-writer.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ + kpi_manager/tests/test_kpi_writer.py \ No newline at end of file diff --git a/src/kpi_manager/service/KpiWriter.py b/src/kpi_manager/service/KpiWriter.py new file mode 100644 index 0000000000000000000000000000000000000000..3c8382c12c0c2d5112a979a9a8cbf1012658d9c1 --- /dev/null +++ b/src/kpi_manager/service/KpiWriter.py @@ -0,0 +1,87 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# read Kafka stream from Kafka topic + +import threading +from confluent_kafka import KafkaError +from prometheus_client import start_http_server, Gauge +from confluent_kafka import Consumer as KafkaConsumer + +KAFKA_SERVER_IP = '127.0.0.1:9092' +KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', + 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} +CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'kpi_writer', + 'auto.offset.reset' : 'latest'} +KPIs_TO_SEARCH = ["node_network_receive_packets_total", + "node_network_receive_bytes_total", + "node_network_transmit_bytes_total", + "process_open_fds"] +PROM_METRICS = {} + + +class KpiWriter: + def __init__(self) -> None: + pass + + @staticmethod + def kpi_writer(): + threading.Thread(target=KpiWriter.kafka_listener, args=()).start() + + @staticmethod + def kafka_listener(): + """ + listener for events on Kafka topic. + """ + kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) + kafka_consumer.subscribe([KAFKA_TOPICS['labeled']]) + while True: + receive_msg = kafka_consumer.poll(2.0) + if receive_msg is None: + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + continue + try: + new_event = receive_msg.value().decode('utf-8') + # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + KpiWriter.write_metric_to_promtheus(new_event) + except Exception as e: + print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail: {str(e)}") + continue + + @staticmethod + # send metric to Prometheus + def write_metric_to_promtheus(event): + print("New recevied event: {:}".format(event)) + + # # create Prometheus metrics + # for metric_key in KPIs_TO_SEARCH: + # metric_name = metric_key + # metric_description = "description of " + str(metric_key) + # metric_tags = "tags of " + str(metric_key) + # PROM_METRICS[metric_key] = Gauge( metric_name, metric_description,metric_tags ) + + # NN_REC_PKTS_TOTAL = PROM_METRICS["node_network_receive_packets_total"] + # NN_REC_BYTS_TOTAL = PROM_METRICS["node_network_receive_bytes_total"] + # NN_TRSMT_BYTS_TOTAL = PROM_METRICS["node_network_transmit_bytes_total"] + # PROC_OPEN_FDs = PROM_METRICS["process_open_fds"] + + diff --git a/src/kpi_manager/service/database/Kpi_DB.py b/src/kpi_manager/service/database/Kpi_DB.py index 68ac156c7f3770312ebbb00cc6219e4ed9c73ca3..45c9ff7edbd14dd67935c9640bdafed3e0866502 100644 --- a/src/kpi_manager/service/database/Kpi_DB.py +++ b/src/kpi_manager/service/database/Kpi_DB.py @@ -31,7 +31,7 @@ class Kpi_DB: return False self.db_name = DB_NAME # self.drop_database(self.db_engine) # added to test - # self.create_database(self.db_engine) # to add database + self.create_database(self.db_engine) # to add database self.Session = sessionmaker(bind=self.db_engine) @staticmethod @@ -50,7 +50,7 @@ class Kpi_DB: Kpi.metadata.create_all(self.db_engine) # type: ignore LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) except Exception as e: - LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) + LOGGER.info("Tables cannot be created in the kpi database. {:s}".format(str(e))) def verify_tables(self): try: diff --git a/src/kpi_manager/tests/test_unitary.py b/src/kpi_manager/tests/test_kpi_manager.py similarity index 100% rename from src/kpi_manager/tests/test_unitary.py rename to src/kpi_manager/tests/test_kpi_manager.py diff --git a/src/kpi_manager/tests/test_kpi_writer.py b/src/kpi_manager/tests/test_kpi_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..d2261b6addebea49770b45dfb92af4742614fb93 --- /dev/null +++ b/src/kpi_manager/tests/test_kpi_writer.py @@ -0,0 +1,24 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import logging +from kpi_manager.service.KpiWriter import KpiWriter + +LOGGER = logging.getLogger(__name__) + +def test_kpi_writer(): + LOGGER.info(' >>> test_kpi_writer START <<< ') + KpiWriter.kpi_writer() +