Commits (5)
...@@ -19,25 +19,19 @@ class KafkaProducerService: ...@@ -19,25 +19,19 @@ class KafkaProducerService:
Class to control Kafka producer functionality. Class to control Kafka producer functionality.
""" """
def __init__(self): def __init__(self):
pass
kafka_configs = self.generate_kafka_configs()
self.bootstrap_servers = kafka_configs['bootstrap_servers'] def generate_kafka_configs(self):
self.node_exporter_endpoint = kafka_configs['node_exporter_endpoint']
self.kafka_topic = kafka_configs['kafka_topic']
self.run_duration = kafka_configs['run_duration']
self.fetch_interval = kafka_configs['fetch_interval']
def generate_kafka_configs(self): # define the function to get every attribute
""" """
Method to generate Kafka configurations Method to generate Kafka configurations
""" """
create_kafka_configs = { create_kafka_configs = {
'bootstrap_servers' : '127.0.0.1:9092', # Kafka broker address - Replace with your Kafka broker address 'bootstrap_servers' : "test_server", # Kafka broker address - Replace with your Kafka broker address
'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint 'exporter_endpoint' : "test_exporter", # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint
'kafka_topic' : 'metric-data', # Kafka topic to produce to 'kafka_topic' : "test_kafka_topic", # Kafka topic to produce to
'run_duration' : 20, # Total duration to execute the producer 'run_duration' : 10, # Total duration to execute the producer
'fetch_interval' : 4 # Time between two fetch requests 'fetch_interval' : 2 # Time between two fetch requests
} }
return create_kafka_configs return create_kafka_configs
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
# limitations under the License. # limitations under the License.
import time import time
import grpc
import logging import logging
import requests import requests
import threading import threading
from typing import Tuple
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from confluent_kafka import Producer, KafkaException from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.admin import AdminClient, NewTopic
...@@ -32,31 +32,34 @@ class KafkaProducerServiceImpl: ...@@ -32,31 +32,34 @@ class KafkaProducerServiceImpl:
Class to fetch metrics from Exporter and produce them to Kafka. Class to fetch metrics from Exporter and produce them to Kafka.
""" """
def __init__(self, bootstrap_servers, node_exporter_endpoint, kafka_topic, run_duration, fetch_interval): def __init__(self, bootstrap_servers=None, exporter_endpoint=None,
kafka_topic=None, run_duration=None, fetch_interval=None):
""" """
Constructor to initialize Kafka producer parameters. Constructor to initialize Kafka producer parameters.
Args: Args:
bootstrap_servers (str): Kafka broker address. bootstrap_servers (str): Kafka broker address.
node_exporter_endpoint (str): Node Exporter metrics endpoint. exporter_endpoint (str): Node Exporter metrics endpoint.
kafka_topic (str): Kafka topic to produce metrics to. kafka_topic (str): Kafka topic to produce metrics to.
run_interval (int): Time interval in seconds to run the producer. run_interval (int): Time interval in seconds to run the producer.
""" """
LOGGER.info('Init TelemetryBackendService') LOGGER.info('Init TelemetryBackendService')
self.bootstrap_servers = bootstrap_servers self.bootstrap_servers = bootstrap_servers
self.node_exporter_endpoint = node_exporter_endpoint self.exporter_endpoint = exporter_endpoint
self.kafka_topic = kafka_topic self.kafka_topic = kafka_topic
self.run_duration = run_duration self.run_duration = run_duration
self.fetch_interval = fetch_interval self.fetch_interval = fetch_interval
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def export_collector_value(request: CollectorId) -> str: # type: ignore def export_collector_value(request : Collector) -> Tuple[str, str]: # type: ignore
response = str() response = Tuple[str, str]
response = '-1' response = ('test collector Id', 'test collected value') # Metric to be fetched from endpoint based on Collector message
return response return response
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore def write_to_kafka(request: Tuple[str, str]) -> Empty: # type: ignore
# _collector_id, _collector_id_value = request
# write collector_id and collector_id value on the Kafka topic
return Empty() return Empty()
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter ----------- # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
...@@ -69,7 +72,7 @@ class KafkaProducerServiceImpl: ...@@ -69,7 +72,7 @@ class KafkaProducerServiceImpl:
""" """
KPI = "node_network_receive_packets_total" KPI = "node_network_receive_packets_total"
try: try:
response = requests.get(self.node_exporter_endpoint) response = requests.get(self.exporter_endpoint)
if response.status_code == 200: if response.status_code == 200:
# print(f"Metrics fetched sucessfully...") # print(f"Metrics fetched sucessfully...")
metrics = response.text metrics = response.text
......
...@@ -13,34 +13,20 @@ ...@@ -13,34 +13,20 @@
# limitations under the License. # limitations under the License.
def create_kafka_config(): def create_kafka_config_a(bootstrap_server: str, exporter_endpoint: str, kafka_topic: str,
""" run_duration: int, fetch_interval: int):
No input parameter is requested
Returns the dict object with Kafka configs
"""
_kafka_configs = dict()
_kafka_configs['bootstrap_servers'] = '127.0.0.1:9092'
_kafka_configs['exporter_endpoint'] = 'http://10.152.183.231:9100/metrics'
_kafka_configs['kafka_topic'] = 'metric-data'
_kafka_configs['run_duration'] = 20
_kafka_configs['fetch_interval'] = 4
return _kafka_configs
def create_kafka_config_a(bootstrap_server, exporter_endpoint, kafka_topic, run_duration, fetch_interval):
""" """
Provide ... Provide ...
Bootstrap_server IP address as String. Bootstrap_server IP address as String.
Exporter endpoint with port <http://ip:port(metrics)> address as String. Exporter endpoint with port <http://ip:port(metrics)> address as String.
Kafka topic name as String. Kafka topic name as String.
Total duration of the test as Float. Total duration of the test as Int.
Fetch_interval as Float. Fetch_interval as Int.
""" """
_kafka_configs = dict() _bootstrap_servers = bootstrap_server
_kafka_configs['bootstrap_servers'] = bootstrap_server _exporter_endpoint = exporter_endpoint
_kafka_configs['exporter_endpoint'] = exporter_endpoint _kafka_topic = kafka_topic
_kafka_configs['kafka_topic'] = kafka_topic _run_duration = run_duration
_kafka_configs['run_duration'] = run_duration _fetch_interval = fetch_interval
_kafka_configs['fetch_interval'] = fetch_interval
return _kafka_configs return _bootstrap_servers, _exporter_endpoint, _kafka_topic, _run_duration, _fetch_interval
\ No newline at end of file
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
# import sys # import sys
# print (sys.path) # print (sys.path)
import logging import logging
from .messagesBackend import create_kafka_config, create_kafka_config_a from typing import Tuple
from src.telemetry_frontend.tests.Messages import create_collector_request from common.proto.context_pb2 import Empty
from src.telemetry_frontend.tests.Messages import create_collector_request, create_collector_id
from src.telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService from src.telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService
from src.telemetry_frontend.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl from src.telemetry_frontend.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl
...@@ -28,17 +29,8 @@ LOGGER = logging.getLogger(__name__) ...@@ -28,17 +29,8 @@ LOGGER = logging.getLogger(__name__)
########################### ###########################
def test_get_kafka_configs(): def test_get_kafka_configs():
LOGGER.warning('test_get_kafka_configs requesting') LOGGER.warning('test_get_kafka_configs requesting')
response = KafkaProducerService.generate_kafka_configs( KafkaProducerServiceObj = KafkaProducerService()
create_kafka_config() response = KafkaProducerServiceObj.generate_kafka_configs()
)
LOGGER.debug(str(response))
assert isinstance(response, dict)
def test_get_kafka_configs_a():
LOGGER.warning('test_get_kafka_configs_a requesting')
response = KafkaProducerService.generate_kafka_configs(
create_kafka_config_a('ip:port', 'ip:port', 'test_topic', 10, 3)
)
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, dict) assert isinstance(response, dict)
...@@ -48,4 +40,11 @@ def test_export_collector_value(): ...@@ -48,4 +40,11 @@ def test_export_collector_value():
create_collector_request('1') create_collector_request('1')
) )
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, str) assert isinstance(response, Tuple)
\ No newline at end of file
def test_write_to_kafka():
LOGGER.warning('test_write_to_kafka requesting')
collector_value = KafkaProducerServiceImpl.export_collector_value(create_collector_request('1'))
response = KafkaProducerServiceImpl.write_to_kafka(collector_value) # type: ignore (don't know why warning here)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
from common.proto import telemetry_frontend_pb2 from common.proto import telemetry_frontend_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
def create_collector_id(coll_id_str): def create_collector_id(coll_id_str : str):
_collector_id = telemetry_frontend_pb2.CollectorId() _collector_id = telemetry_frontend_pb2.CollectorId()
_collector_id.collector_id.uuid = str(coll_id_str) _collector_id.collector_id.uuid = str(coll_id_str)
return _collector_id return _collector_id
def create_collector_request(coll_id_str): def create_collector_request(coll_id_str : str):
_create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request = telemetry_frontend_pb2.Collector()
_create_collector_request.collector_id.collector_id.uuid = str(coll_id_str) _create_collector_request.collector_id.collector_id.uuid = str(coll_id_str)
_create_collector_request.kpi_id.kpi_id.uuid = 'KPIid' + str(coll_id_str) _create_collector_request.kpi_id.kpi_id.uuid = 'KPIid' + str(coll_id_str)
...@@ -29,15 +29,16 @@ def create_collector_request(coll_id_str): ...@@ -29,15 +29,16 @@ def create_collector_request(coll_id_str):
return _create_collector_request return _create_collector_request
def create_collector_request_a(): def create_collector_request_a():
_create_collector_request_a = telemetry_frontend_pb2.Collector() _create_collector_request_a = telemetry_frontend_pb2.Collector()
_create_collector_request_a.kpi_id.kpi_id.uuid = "-1" _create_collector_request_a.collector_id.collector_id.uuid = "-1"
return _create_collector_request_a return _create_collector_request_a
def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s): def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s):
_create_collector_request_b = telemetry_frontend_pb2.Collector() _create_collector_request_b = telemetry_frontend_pb2.Collector()
_create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id _create_collector_request_b.collector_id.collector_id.uuid = '-1'
_create_collector_request_b.duration_s = coll_duration_s _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id
_create_collector_request_b.interval_s = coll_interval_s _create_collector_request_b.duration_s = coll_duration_s
_create_collector_request_b.interval_s = coll_interval_s
return _create_collector_request_b return _create_collector_request_b
def create_collector_filter(): def create_collector_filter():
......