Commits (6)
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --verbose \
telemetry_frontend/backend/tests/test_kafka_backend.py
\ No newline at end of file
...
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
...@@ -12,8 +11,3 @@ ...@@ -12,8 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService
kafka_controller = KafkaProducerService()
kafka_controller.run_producer()
\ No newline at end of file
...@@ -12,36 +12,38 @@ ...@@ -12,36 +12,38 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from KafkaProducerServiceImpl import KafkaProducerServiceImpl from .KafkaProducerServiceImpl import KafkaProducerServiceImpl
class KafkaProducerService: class KafkaProducerService:
""" """
Class to control Kafka producer functionality. Class to control Kafka producer functionality.
""" """
def __init__(self): def __init__(self):
kafka_configs = self.generate_kafka_configurations()
kafka_configs = self.generate_kafka_configs()
self.bootstrap_servers = kafka_configs['bootstrap_servers'] self.bootstrap_servers = kafka_configs['bootstrap_servers']
self.node_exporter_endpoint = kafka_configs['node_exporter_endpoint'] self.node_exporter_endpoint = kafka_configs['node_exporter_endpoint']
self.kafka_topic = kafka_configs['kafka_topic'] self.kafka_topic = kafka_configs['kafka_topic']
self.run_duration = kafka_configs['run_duration'] self.run_duration = kafka_configs['run_duration']
self.fetch_interval = kafka_configs['fetch_interval'] self.fetch_interval = kafka_configs['fetch_interval']
def generate_kafka_configurations(self): def generate_kafka_configs(self): # define the function to get every attribute
""" """
Method to generate Kafka configurations Method to generate Kafka configurations
""" """
create_kafka_configuration = { create_kafka_configs = {
'bootstrap_servers' : '127.0.0.1:9092', # Kafka broker address - Replace with your Kafka broker address 'bootstrap_servers' : '127.0.0.1:9092', # Kafka broker address - Replace with your Kafka broker address
'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint 'node_exporter_endpoint' : 'http://10.152.183.231:9100/metrics', # Node Exporter metrics endpoint - Replace with your Node Exporter endpoint
'kafka_topic' : 'metric-data', # Kafka topic to produce to 'kafka_topic' : 'metric-data', # Kafka topic to produce to
'run_duration' : 20, # Total duration to execute the producer 'run_duration' : 20, # Total duration to execute the producer
'fetch_interval' : 4 # Time between two fetch requests 'fetch_interval' : 4 # Time between two fetch requests
} }
return create_kafka_configuration return create_kafka_configs
def run_producer(self): def run_producer(self):
""" """
Method to create KafkaProducerServiceImpl object and start producer thread. Method to create KafkaProducerServiceImpl object and start producer.
""" """
# Create NodeExporterProducer object and run start_producer_thread # Create NodeExporterProducer object and run start_producer_thread
producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint, producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint,
......
...@@ -13,11 +13,19 @@ ...@@ -13,11 +13,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
import requests
import time import time
import grpc
import logging
import requests
import threading import threading
from common.proto.context_pb2 import Empty
from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')
class KafkaProducerServiceImpl: class KafkaProducerServiceImpl:
""" """
...@@ -33,11 +41,25 @@ class KafkaProducerServiceImpl: ...@@ -33,11 +41,25 @@ class KafkaProducerServiceImpl:
kafka_topic (str): Kafka topic to produce metrics to. kafka_topic (str): Kafka topic to produce metrics to.
run_interval (int): Time interval in seconds to run the producer. run_interval (int): Time interval in seconds to run the producer.
""" """
LOGGER.info('Init TelemetryBackendService')
self.bootstrap_servers = bootstrap_servers self.bootstrap_servers = bootstrap_servers
self.node_exporter_endpoint = node_exporter_endpoint self.node_exporter_endpoint = node_exporter_endpoint
self.kafka_topic = kafka_topic self.kafka_topic = kafka_topic
self.run_duration = run_duration self.run_duration = run_duration
self.fetch_interval = fetch_interval self.fetch_interval = fetch_interval
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def export_collector_value(request: CollectorId) -> str: # type: ignore
response = str()
response = '-1'
return response
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def write_to_kafka(Collector, kpi_value) -> Empty: # type: ignore
return Empty()
# ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
def fetch_node_exporter_metrics(self): def fetch_node_exporter_metrics(self):
""" """
...@@ -114,7 +136,7 @@ class KafkaProducerServiceImpl: ...@@ -114,7 +136,7 @@ class KafkaProducerServiceImpl:
def produce_metrics(self): def produce_metrics(self):
""" """
Method to continuously produce metrics to Kafka topic. Method to produce metrics to Kafka topic as per Kafka configs.
""" """
conf = { conf = {
'bootstrap.servers': self.bootstrap_servers, 'bootstrap.servers': self.bootstrap_servers,
...@@ -146,10 +168,12 @@ class KafkaProducerServiceImpl: ...@@ -146,10 +168,12 @@ class KafkaProducerServiceImpl:
finally: finally:
kafka_producer.flush() kafka_producer.flush()
# kafka_producer.close() # this command generates ERROR # kafka_producer.close() # this command generates ERROR
# ---
def start_producer_thread(self): def start_producer_thread(self):
""" """
Method to start the producer thread. Method to start the producer thread.
""" """
producer_thread = threading.Thread(target=self.produce_metrics) producer_thread = threading.Thread(target=self.produce_metrics)
producer_thread.start() producer_thread.start()
# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def create_kafka_config():
"""
No input parameter is requested
Returns the dict object with Kafka configs
"""
_kafka_configs = dict()
_kafka_configs['bootstrap_servers'] = '127.0.0.1:9092'
_kafka_configs['exporter_endpoint'] = 'http://10.152.183.231:9100/metrics'
_kafka_configs['kafka_topic'] = 'metric-data'
_kafka_configs['run_duration'] = 20
_kafka_configs['fetch_interval'] = 4
return _kafka_configs
def create_kafka_config_a(bootstrap_server, exporter_endpoint, kafka_topic, run_duration, fetch_interval):
"""
Provide ...
Bootstrap_server IP address as String.
Exporter endpoint with port <http://ip:port(metrics)> address as String.
Kafka topic name as String.
Total duration of the test as Float.
Fetch_interval as Float.
"""
_kafka_configs = dict()
_kafka_configs['bootstrap_servers'] = bootstrap_server
_kafka_configs['exporter_endpoint'] = exporter_endpoint
_kafka_configs['kafka_topic'] = kafka_topic
_kafka_configs['run_duration'] = run_duration
_kafka_configs['fetch_interval'] = fetch_interval
return _kafka_configs
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# import sys
# print (sys.path)
import logging
from .messagesBackend import create_kafka_config, create_kafka_config_a
from src.telemetry_frontend.tests.Messages import create_collector_request
from src.telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService
from src.telemetry_frontend.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl
LOGGER = logging.getLogger(__name__)
###########################
# Tests Implementation of Telemetry Backend
###########################
def test_get_kafka_configs():
LOGGER.warning('test_get_kafka_configs requesting')
response = KafkaProducerService.generate_kafka_configs(
create_kafka_config()
)
LOGGER.debug(str(response))
assert isinstance(response, dict)
def test_get_kafka_configs_a():
LOGGER.warning('test_get_kafka_configs_a requesting')
response = KafkaProducerService.generate_kafka_configs(
create_kafka_config_a('ip:port', 'ip:port', 'test_topic', 10, 3)
)
LOGGER.debug(str(response))
assert isinstance(response, dict)
def test_export_collector_value():
LOGGER.warning('test_export_collector_value requesting')
response = KafkaProducerServiceImpl.export_collector_value(
create_collector_request('1')
)
LOGGER.debug(str(response))
assert isinstance(response, str)
\ No newline at end of file