Loading src/telemetry/backend/tests/testTelemetryBackend.py +10 −32 Original line number Diff line number Diff line Loading @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # import sys # print (sys.path) import sys print (sys.path) sys.path.append('/home/tfs/tfs-ctrl') import logging from typing import Tuple from confluent_kafka import Producer as KafkaProducer from common.proto.context_pb2 import Empty from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl LOGGER = logging.getLogger(__name__) Loading @@ -28,32 +28,10 @@ LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.generate_kafka_configs() LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) LOGGER.debug(str(response)) assert isinstance(response, Tuple) def test_write_to_kafka(): LOGGER.warning('test_write_to_kafka requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() _collector_value = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) response = TelemetryBackendServiceObj.write_to_kafka(_collector_value) LOGGER.debug(str(response)) assert isinstance(response, KafkaProducer) def test_stop_producer(): LOGGER.warning('test_write_to_kafka requesting') _kafka_configs = {'bootstrap.servers': '127.0.0.1:9092'} TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.stop_producer(KafkaProducer(_kafka_configs)) def test_run_kafka_listener(): LOGGER.warning('test_receive_kafka_request requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.run_kafka_listener() LOGGER.debug(str(response)) assert isinstance(response, Empty) No newline at end of file assert isinstance(response, bool) Loading
src/telemetry/backend/tests/testTelemetryBackend.py +10 −32 Original line number Diff line number Diff line Loading @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # import sys # print (sys.path) import sys print (sys.path) sys.path.append('/home/tfs/tfs-ctrl') import logging from typing import Tuple from confluent_kafka import Producer as KafkaProducer from common.proto.context_pb2 import Empty from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl LOGGER = logging.getLogger(__name__) Loading @@ -28,32 +28,10 @@ LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.generate_kafka_configs() LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) LOGGER.debug(str(response)) assert isinstance(response, Tuple) def test_write_to_kafka(): LOGGER.warning('test_write_to_kafka requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() _collector_value = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) response = TelemetryBackendServiceObj.write_to_kafka(_collector_value) LOGGER.debug(str(response)) assert isinstance(response, KafkaProducer) def test_stop_producer(): LOGGER.warning('test_write_to_kafka requesting') _kafka_configs = {'bootstrap.servers': '127.0.0.1:9092'} TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.stop_producer(KafkaProducer(_kafka_configs)) def test_run_kafka_listener(): LOGGER.warning('test_receive_kafka_request requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.run_kafka_listener() LOGGER.debug(str(response)) assert isinstance(response, Empty) No newline at end of file assert isinstance(response, bool)