Newer
Older
Waleed Akbar
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# read Kafka stream from Kafka topic
import re
import logging
import threading
from confluent_kafka import KafkaError
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
LOGGER = logging.getLogger(__name__)
KAFKA_SERVER_IP = '127.0.0.1:9092'
# ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP})
KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response',
'raw' : 'topic_raw' , 'labeled' : 'topic_labled'}
PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,}
CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP,
'group.id' : 'kpi_composer',
'auto.offset.reset' : 'latest'}
KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"]
class KpiValueComposer:
def __init__(self) -> None:
pass
@staticmethod
def compose_kpi():
KpiValueComposer.run_kafka_listener()
@staticmethod
def run_kafka_listener():
threading.Thread(target=KpiValueComposer.kafka_listener, args=()).start()
@staticmethod
def kafka_listener():
"""
listener for events on Kafka topic.
"""
kafka_consumer = KafkaConsumer(CONSUMER_CONFIG)
kafka_consumer.subscribe([KAFKA_TOPICS['raw']])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None:
# print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
continue
try:
new_event = receive_msg.value().decode('utf-8')
# print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event))
KpiValueComposer.extract_kpi_values(new_event)
except Exception as e:
print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}")
continue
@staticmethod
def extract_kpi_values(event):
pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH)))
lines = event.split('\n')
matching_rows = []
for line in lines:
if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"):
matching_rows.append(tuple(line.split(" ")))
print("Extracted Rows that match the KPIs {:}".format(matching_rows))
# LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows))
return matching_rows