Newer
Older

Waleed Akbar
committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# read Kafka stream from Kafka topic
import re
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
LOGGER = logging.getLogger(__name__)
KAFKA_SERVER_IP = '10.152.183.175:9092'
# KAFKA_SERVER_IP = '127.0.0.1:9092'

Waleed Akbar
committed
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response',
'raw' : 'topic_raw' , 'labeled' : 'topic_labled'}
PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'kpi_composer',
'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"]
class KpiValueComposer:
def __init__(self) -> None:
pass
@staticmethod
def compose_kpi():
KpiValueComposer.run_kafka_listener()
@staticmethod
def run_kafka_listener():
threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
@staticmethod
def kafka_listener():
"""
listener for events on Kafka topic.
"""
kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
kafka_consumer.subscribe([KAFKA_TOPICS['raw']])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None:
# print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
continue
try:
new_event = receive_msg.value().decode('utf-8')
# print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
# LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))

Waleed Akbar
committed
KpiValueComposer.extract_kpi_values(new_event)
except Exception as e:
print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}")
continue
@staticmethod
def extract_kpi_values(event):
pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH)))
lines = event.split('\n')
matching_rows = []
for line in lines:
if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"):
matching_rows.append(tuple(line.split(" ")))
print("Extracted Rows that match the KPIs {:}".format(matching_rows))
# LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows))
return matching_rows