Skip to content
GitLab
Projects
Groups
Topics
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
TFS
controller
Compare revisions
97e5fafc9201f56737444996639e9f54f1483e27...a5f83a179afd7b4176f4d72befdd475af839332a
Commits (2)
files rename to "KafkaProducerService" and "KafkaProducerServiceImpl"
· 0e46347c
Waleed Akbar
authored
Apr 30, 2024
0e46347c
Kafka tests file added
· a5f83a17
Waleed Akbar
authored
Apr 30, 2024
a5f83a17
Hide whitespace changes
Inline
Side-by-side
src/telemetry_frontend/backend/KafkaProducer
Controller
.py
→
src/telemetry_frontend/backend/
service/
KafkaProducer
Service
.py
View file @
a5f83a17
...
...
@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from
NodeExporterProducer
import
KafkaNodeExporterProducer
from
KafkaProducerServiceImpl
import
KafkaProducerServiceImpl
class
KafkaProducer
Controller
:
class
KafkaProducer
Service
:
"""
Class to control Kafka producer functionality.
"""
...
...
@@ -35,16 +35,16 @@ class KafkaProducerController:
'node_exporter_endpoint'
:
'http://10.152.183.231:9100/metrics'
,
# Node Exporter metrics endpoint - Replace with your Node Exporter endpoint
'kafka_topic'
:
'metric-data'
,
# Kafka topic to produce to
'run_duration'
:
20
,
# Total duration to execute the producer
'fetch_interval'
:
3
# Time between two fetch requests
'fetch_interval'
:
4
# Time between two fetch requests
}
return
create_kafka_configuration
def
run_producer
(
self
):
"""
Method to create Kafka
NodeExporterProducer
object and start producer thread.
Method to create Kafka
ProducerServiceImpl
object and start producer thread.
"""
# Create NodeExporterProducer object and run start_producer_thread
producer
=
Kafka
NodeExporterProducer
(
self
.
bootstrap_servers
,
self
.
node_exporter_endpoint
,
producer
=
Kafka
ProducerServiceImpl
(
self
.
bootstrap_servers
,
self
.
node_exporter_endpoint
,
self
.
kafka_topic
,
self
.
run_duration
,
self
.
fetch_interval
)
# producer.start_producer_thread() # if threading is required
...
...
@@ -52,6 +52,6 @@ class KafkaProducerController:
if
__name__
==
"__main__"
:
# Create Kafka producer
controller
object and run producer
kafka_controller
=
KafkaProducer
Controller
()
# Create Kafka producer
service
object and run producer
kafka_controller
=
KafkaProducer
Service
()
kafka_controller
.
run_producer
()
src/telemetry_frontend/backend/
NodeExporterProducer
.py
→
src/telemetry_frontend/backend/
service/KafkaProducerServiceImpl
.py
View file @
a5f83a17
...
...
@@ -19,9 +19,9 @@ import requests
import
time
import
threading
class
Kafka
NodeExporterProducer
:
class
Kafka
ProducerServiceImpl
:
"""
Class to fetch metrics from
Node
Exporter and produce them to Kafka.
Class to fetch metrics from Exporter and produce them to Kafka.
"""
def
__init__
(
self
,
bootstrap_servers
,
node_exporter_endpoint
,
kafka_topic
,
run_duration
,
fetch_interval
):
...
...
@@ -39,17 +39,25 @@ class KafkaNodeExporterProducer:
self
.
run_duration
=
run_duration
self
.
fetch_interval
=
fetch_interval
def
fetch_metrics
(
self
):
def
fetch_
node_exporter_
metrics
(
self
):
"""
Method to fetch metrics from Node Exporter.
Returns:
str: Metrics fetched from Node Exporter.
"""
KPI
=
"node_network_receive_packets_total"
try
:
response
=
requests
.
get
(
self
.
node_exporter_endpoint
)
if
response
.
status_code
==
200
:
print
(
f
"Metrics fetched sucessfully..."
)
return
response
.
text
# print(f"Metrics fetched sucessfully...")
metrics
=
response
.
text
# Check if the desired metric is available in the response
if
KPI
in
metrics
:
KPI_VALUE
=
self
.
extract_metric_value
(
metrics
,
KPI
)
# Extract the metric value
if
KPI_VALUE
is
not
None
:
print
(
f
"KPI value:
{
KPI_VALUE
}
"
)
return
KPI_VALUE
else
:
print
(
f
"Failed to fetch metrics. Status code:
{
response
.
status_code
}
"
)
return
None
...
...
@@ -57,6 +65,25 @@ class KafkaNodeExporterProducer:
print
(
f
"Failed to fetch metrics:
{
str
(
e
)
}
"
)
return
None
def
extract_metric_value
(
self
,
metrics
,
metric_name
):
"""
Method to extract the value of a metric from the metrics string.
Args:
metrics (str): Metrics string fetched from Node Exporter.
metric_name (str): Name of the metric to extract.
Returns:
float: Value of the extracted metric, or None if not found.
"""
try
:
# Find the metric line containing the desired metric name
metric_line
=
next
(
line
for
line
in
metrics
.
split
(
'
\n
'
)
if
line
.
startswith
(
metric_name
))
# Split the line to extract the metric value
metric_value
=
float
(
metric_line
.
split
()[
1
])
return
metric_value
except
StopIteration
:
print
(
f
"Metric '
{
metric_name
}
' not found in the metrics."
)
return
None
def
delivery_callback
(
self
,
err
,
msg
):
"""
Callback function to handle message delivery status.
...
...
@@ -101,12 +128,12 @@ class KafkaNodeExporterProducer:
try
:
start_time
=
time
.
time
()
while
True
:
metrics
=
self
.
fetch_
metrics
()
metrics
=
self
.
fetch_
node_exporter_metrics
()
# select the function name based on the provided requirements
if
metrics
:
kafka_producer
.
produce
(
self
.
kafka_topic
,
metrics
.
encode
(
'utf-8'
),
callback
=
self
.
delivery_callback
)
kafka_producer
.
produce
(
self
.
kafka_topic
,
str
(
metrics
),
callback
=
self
.
delivery_callback
)
kafka_producer
.
flush
()
print
(
"Metrics produced to Kafka topic"
)
#
print("Metrics produced to Kafka topic")
# Check if the specified run duration has elapsed
if
time
.
time
()
-
start_time
>=
self
.
run_duration
:
...
...
src/telemetry_frontend/backend/tests/KafkaProducerTests.py
0 → 100644
View file @
a5f83a17
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
telemetry_frontend.backend.service.KafkaProducerService
import
KafkaProducerService
kafka_controller
=
KafkaProducerService
()
kafka_controller
.
run_producer
()
\ No newline at end of file