# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # read Kafka stream from Kafka topic import re import logging import threading from confluent_kafka import KafkaError from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer LOGGER = logging.getLogger(__name__) KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'kpi_composer', 'auto.offset.reset' : 'latest'} KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"] class KpiValueComposer: def __init__(self) -> None: pass @staticmethod def compose_kpi(): KpiValueComposer.run_kafka_listener() @staticmethod def run_kafka_listener(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() @staticmethod def kafka_listener(): """ listener for events on Kafka topic. """ kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) kafka_consumer.subscribe([KAFKA_TOPICS['raw']]) while True: receive_msg = kafka_consumer.poll(2.0) if receive_msg is None: # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(receive_msg.error())) continue try: new_event = receive_msg.value().decode('utf-8') # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.extract_kpi_values(new_event) except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") continue @staticmethod def extract_kpi_values(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') matching_rows = [] for line in lines: if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"): matching_rows.append(tuple(line.split(" "))) print("Extracted Rows that match the KPIs {:}".format(matching_rows)) # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows)) return matching_rows