Skip to content
Snippets Groups Projects
KpiValueComposer.py 3.61 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# read Kafka stream from Kafka topic

import re
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer

LOGGER             = logging.getLogger(__name__)
KAFKA_SERVER_IP    = '10.152.183.175:9092'
# KAFKA_SERVER_IP    = '127.0.0.1:9092'
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS       = {'request' : 'topic_request', 'response': 'topic_response',
                      'raw'     : 'topic_raw'    , 'labeled' : 'topic_labled'}
PRODUCER_CONFIG    = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG    = {'bootstrap.servers' : KAFKA_SERVER_IP,
                      'group.id'          : 'kpi_composer',
                      'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH     = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"]

class KpiValueComposer:
    def __init__(self) -> None:
        pass
    
    @staticmethod
    def compose_kpi():
        KpiValueComposer.run_kafka_listener()
    
    @staticmethod
    def run_kafka_listener():
        threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
    
    @staticmethod
    def kafka_listener():
        """
        listener for events on Kafka topic.
        """
        kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
        kafka_consumer.subscribe([KAFKA_TOPICS['raw']])
        while True:
            receive_msg = kafka_consumer.poll(2.0)
            if receive_msg is None:
                # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw'])     # added for debugging purposes
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print("Consumer error: {}".format(receive_msg.error()))
                    continue
            try:
                new_event = receive_msg.value().decode('utf-8')
                # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
                # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
                KpiValueComposer.extract_kpi_values(new_event)
            except Exception as e:
                print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail:  {str(e)}")
                continue

    @staticmethod
    def extract_kpi_values(event):
        pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH)))
        lines = event.split('\n')
        matching_rows = []
        for line in lines:
            if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"):
                matching_rows.append(tuple(line.split(" ")))
        print("Extracted Rows that match the KPIs {:}".format(matching_rows))
        # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows))
        return matching_rows