Commit 25eb5f8b authored by Waleed Akbar's avatar Waleed Akbar
Browse files

KpiWriter is sucessfully able to read from Kafka "topic_labled".

parent 98039a9c
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -25,4 +25,4 @@ cd $PROJECTDIR/src


RCFILE=$PROJECTDIR/coverage/.coveragerc
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
    kpi_manager/tests/test_unitary.py
    kpi_manager/tests/test_kpi_manager.py
 No newline at end of file
 No newline at end of file
+23 −0
Original line number Original line Diff line number Diff line
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


PROJECTDIR=`pwd`

cd $PROJECTDIR/src

RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
    kpi_manager/tests/test_kpi_writer.py
 No newline at end of file
+87 −0
Original line number Original line Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# read Kafka stream from Kafka topic

import threading
from confluent_kafka import KafkaError
from prometheus_client import start_http_server, Gauge
from confluent_kafka import Consumer as KafkaConsumer

KAFKA_SERVER_IP = '127.0.0.1:9092'
KAFKA_TOPICS    = {'request' : 'topic_request', 'response': 'topic_response',
                   'raw'     : 'topic_raw'    , 'labeled' : 'topic_labeled'}
CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP,
                   'group.id'          : 'kpi_writer',
                   'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH  = ["node_network_receive_packets_total",
                   "node_network_receive_bytes_total",
                   "node_network_transmit_bytes_total",
                   "process_open_fds"]
PROM_METRICS    = {}


class KpiWriter:
    def __init__(self) -> None:
        pass

    @staticmethod
    def kpi_writer():
        threading.Thread(target=KpiWriter.kafka_listener, args=()).start() 

    @staticmethod
    def kafka_listener():
        """
        listener for events on Kafka topic.
        """
        kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
        kafka_consumer.subscribe([KAFKA_TOPICS['labeled']])
        while True:
            receive_msg = kafka_consumer.poll(2.0)
            if receive_msg is None:
                # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    continue
            try:
                new_event = receive_msg.value().decode('utf-8')
                # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
                # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
                KpiWriter.write_metric_to_promtheus(new_event)
            except Exception as e:
                print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail:  {str(e)}")
                continue

    @staticmethod
    # send metric to Prometheus
    def write_metric_to_promtheus(event):
        print("New recevied event:  {:}".format(event))

    # # create Prometheus metrics
    # for metric_key in KPIs_TO_SEARCH:
    #     metric_name        = metric_key
    #     metric_description = "description of " + str(metric_key)
    #     metric_tags        = "tags of " + str(metric_key)
    #     PROM_METRICS[metric_key] = Gauge( metric_name, metric_description,metric_tags )

    # NN_REC_PKTS_TOTAL   = PROM_METRICS["node_network_receive_packets_total"]
    # NN_REC_BYTS_TOTAL   = PROM_METRICS["node_network_receive_bytes_total"]
    # NN_TRSMT_BYTS_TOTAL = PROM_METRICS["node_network_transmit_bytes_total"]
    # PROC_OPEN_FDs       = PROM_METRICS["process_open_fds"]

+2 −2
Original line number Original line Diff line number Diff line
@@ -31,7 +31,7 @@ class Kpi_DB:
            return False
            return False
        self.db_name = DB_NAME
        self.db_name = DB_NAME
        # self.drop_database(self.db_engine)          # added to test
        # self.drop_database(self.db_engine)          # added to test
        # self.create_database(self.db_engine)         # to add database
        self.create_database(self.db_engine)         # to add database
        self.Session = sessionmaker(bind=self.db_engine)
        self.Session = sessionmaker(bind=self.db_engine)


    @staticmethod
    @staticmethod
@@ -50,7 +50,7 @@ class Kpi_DB:
            Kpi.metadata.create_all(self.db_engine)     # type: ignore
            Kpi.metadata.create_all(self.db_engine)     # type: ignore
            LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name))
            LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name))
        except Exception as e:
        except Exception as e:
            LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
            LOGGER.info("Tables cannot be created in the kpi database. {:s}".format(str(e)))


    def verify_tables(self):
    def verify_tables(self):
        try:
        try:
Loading