Skip to content
Snippets Groups Projects
KpiValueComposer.py 6.08 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# read Kafka stream from Kafka topic

import re
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from kpi_management.service.database.Kpi_DB import Kpi_DB
from kpi_management.service.database.KpiModel import Kpi as KpiModel
Waleed Akbar's avatar
Waleed Akbar committed
# KAFKA_SERVER_IP    = '10.152.183.175:30092'
KAFKA_SERVER_IP    = '127.0.0.1:9092'
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labeled'}
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG    = {'bootstrap.servers' : KAFKA_SERVER_IP,
                      'group.id'          : 'kpi_composer',
                      'auto.offset.reset' : 'latest'}
Waleed Akbar's avatar
Waleed Akbar committed
KPIs_TO_SEARCH     = ["node_network_receive_packets_total",
                       "node_network_receive_bytes_total",
                       "node_network_transmit_bytes_total",
                       "process_open_fds"]
DB_TABLE_NAME      = KpiModel

class KpiValueComposer:
    def __init__(self) -> None:
        pass
    
    @staticmethod
    def compose_kpi():
        threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
    
    @staticmethod
    def kafka_listener():
        """
        listener for events on Kafka topic.
        """
        kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
        kafka_consumer.subscribe([KAFKA_TOPICS['raw']])
        while True:
            receive_msg = kafka_consumer.poll(2.0)
            if receive_msg is None:
                # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    continue
            try:
                new_event = receive_msg.value().decode('utf-8')
                KpiValueComposer.process_event_and_label_kpi(new_event)
            except Exception as e:
                print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail:  {str(e)}")
                continue

    @staticmethod
    def process_event_and_label_kpi(event):
        pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH)))
        lines = event.split('\n')
Waleed Akbar's avatar
Waleed Akbar committed
        # matching_rows = []
        sub_names = kpi_value = ""
Waleed Akbar's avatar
Waleed Akbar committed
            try:
                if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE") and not 'device="lo"' in line:
Waleed Akbar's avatar
Waleed Akbar committed
                    (kpi_name, kpi_value) = line.split(" ")
                    if kpi_name.endswith('}'):
                        (kpi_name, sub_names) = kpi_name.replace('}','').split('{')
                    print("Received KPI from raw topic:  {:}".format((kpi_name, sub_names, kpi_value)))
                    kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name)
Waleed Akbar's avatar
Waleed Akbar committed
                    if kpi_descriptor is not None:
                        kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value)
                        producerObj = KafkaProducer(PRODUCER_CONFIG)
                        producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback)
                        producerObj.flush()
                    else:
                        print ("No matching of KPI ({:}) found in db".format(kpi_name))
Waleed Akbar's avatar
Waleed Akbar committed
            except Exception as e:
                print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e))
    def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): #  = KPIs_TO_SEARCH[0] is added for testing
        kpiDBobj = Kpi_DB()
Waleed Akbar's avatar
Waleed Akbar committed
        row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name)
        if row is not None:
            LOGGER.info("Extracted Row: {:}".format(row))
            return row
        else:
            return None

    @staticmethod
    def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value):
        # Creating a dictionary from the kpi_descriptor's attributes
        kpi_dict = {
            'kpi_id'         : kpi_descriptor.kpi_id,
            'kpi_description': kpi_descriptor.kpi_description,
            'kpi_sample_type': kpi_descriptor.kpi_sample_type,
            'device_id'      : kpi_descriptor.device_id,
            'endpoint_id'    : kpi_descriptor.endpoint_id,
            'service_id'     : kpi_descriptor.service_id,
            'slice_id'       : kpi_descriptor.slice_id,
            'connection_id'  : kpi_descriptor.connection_id,
            'link_id'        : kpi_descriptor.link_id,
            'kpi_value'      : kpi_value
        }
        return kpi_dict

    @staticmethod
    def delivery_callback( err, msg):
        """
        Callback function to handle message delivery status.
        Args:
            err (KafkaError): Kafka error object.
            msg (Message): Kafka message object.
        """
        if err:
            print(f'Message delivery failed: {err}')
        else:
            print(f'Message delivered to topic {msg.topic()}')