Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)

Waleed Akbar
committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# read Kafka stream from Kafka topic
import re
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from kpi_management.service.database.Kpi_DB import Kpi_DB
from kpi_management.service.database.KpiModel import Kpi as KpiModel

Waleed Akbar
committed
LOGGER = logging.getLogger(__name__)
# KAFKA_SERVER_IP = '10.152.183.175:30092'
KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response',
'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'}

Waleed Akbar
committed
PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'kpi_composer',
'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH = ["node_network_receive_packets_total",
"node_network_receive_bytes_total",
"node_network_transmit_bytes_total",
"process_open_fds"]
DB_TABLE_NAME = KpiModel

Waleed Akbar
committed
class KpiValueComposer:
def __init__(self) -> None:
pass
@staticmethod
def compose_kpi():
threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
@staticmethod
def kafka_listener():
"""
listener for events on Kafka topic.
"""
kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
kafka_consumer.subscribe([KAFKA_TOPICS['raw']])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None:
# print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
continue
try:
new_event = receive_msg.value().decode('utf-8')
KpiValueComposer.process_event_and_label_kpi(new_event)

Waleed Akbar
committed
except Exception as e:
print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}")
continue
@staticmethod

Waleed Akbar
committed
pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH)))
lines = event.split('\n')

Waleed Akbar
committed
for line in lines:
if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE") and not 'device="lo"' in line:
(kpi_name, kpi_value) = line.split(" ")
if kpi_name.endswith('}'):
(kpi_name, sub_names) = kpi_name.replace('}','').split('{')
print("Received KPI from raw topic: {:}".format((kpi_name, sub_names, kpi_value)))
kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db(kpi_name)
if kpi_descriptor is not None:
kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value)
producerObj = KafkaProducer(PRODUCER_CONFIG)
producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback)
producerObj.flush()
else:
print ("No matching of KPI ({:}) found in db".format(kpi_name))
except Exception as e:
print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e))

Waleed Akbar
committed
@staticmethod
def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): # = KPIs_TO_SEARCH[0] is added for testing
col_name = "kpi_description"
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name)
if row is not None:
LOGGER.info("Extracted Row: {:}".format(row))
return row
else:
return None
@staticmethod
def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes
kpi_dict = {
'kpi_id' : kpi_descriptor.kpi_id,
'kpi_description': kpi_descriptor.kpi_description,
'kpi_sample_type': kpi_descriptor.kpi_sample_type,
'device_id' : kpi_descriptor.device_id,
'endpoint_id' : kpi_descriptor.endpoint_id,
'service_id' : kpi_descriptor.service_id,
'slice_id' : kpi_descriptor.slice_id,
'connection_id' : kpi_descriptor.connection_id,
'link_id' : kpi_descriptor.link_id,
'kpi_value' : kpi_value
}
return kpi_dict
@staticmethod
def delivery_callback( err, msg):
"""
Callback function to handle message delivery status.
Args:
err (KafkaError): Kafka error object.
msg (Message): Kafka message object.
"""
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()}')