# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # read Kafka stream from Kafka topic import re import logging import threading from confluent_kafka import KafkaError from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from kpi_management.service.database.Kpi_DB import Kpi_DB from kpi_management.service.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) # KAFKA_SERVER_IP = '10.152.183.175:30092' KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'kpi_composer', 'auto.offset.reset' : 'latest'} KPIs_TO_SEARCH = ["node_network_receive_packets_total", "node_network_receive_bytes_total", "node_network_transmit_bytes_total", "process_open_fds"] DB_TABLE_NAME = KpiModel class KpiValueComposer: def __init__(self) -> None: pass @staticmethod def compose_kpi(): threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start() @staticmethod def kafka_listener(): """ listener for events on Kafka topic. """ kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) kafka_consumer.subscribe([KAFKA_TOPICS['raw']]) while True: receive_msg = kafka_consumer.poll(2.0) if receive_msg is None: # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(receive_msg.error())) continue try: new_event = receive_msg.value().decode('utf-8') KpiValueComposer.process_event_and_label_kpi(new_event) except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") continue @staticmethod def process_event_and_label_kpi(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') # matching_rows = [] sub_names = kpi_value = "" for line in lines: try: if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE") and not 'device="lo"' in line: (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name) if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) producerObj = KafkaProducer(PRODUCER_CONFIG) producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback) producerObj.flush() else: print ("No matching of KPI ({:}) found in db".format(kpi_name)) except Exception as e: print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e)) @staticmethod def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): # = KPIs_TO_SEARCH[0] is added for testing col_name = "kpi_description" kpiDBobj = Kpi_DB() row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Extracted Row: {:}".format(row)) return row else: return None @staticmethod def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes kpi_dict = { 'kpi_id' : kpi_descriptor.kpi_id, 'kpi_description': kpi_descriptor.kpi_description, 'kpi_sample_type': kpi_descriptor.kpi_sample_type, 'device_id' : kpi_descriptor.device_id, 'endpoint_id' : kpi_descriptor.endpoint_id, 'service_id' : kpi_descriptor.service_id, 'slice_id' : kpi_descriptor.slice_id, 'connection_id' : kpi_descriptor.connection_id, 'link_id' : kpi_descriptor.link_id, 'kpi_value' : kpi_value } return kpi_dict @staticmethod def delivery_callback( err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}')