Loading src/telemetry_frontend/backend/tests/test_kafka_backend.py 0 → 100644 +51 −0 Original line number Diff line number Diff line # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys # print (sys.path) import logging from .messagesBackend import create_kafka_config, create_kafka_config_a from src.telemetry_frontend.tests.Messages import create_collector_request from src.telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService from src.telemetry_frontend.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') response = KafkaProducerService.generate_kafka_configs( create_kafka_config() ) LOGGER.debug(str(response)) assert isinstance(response, dict) def test_get_kafka_configs_a(): LOGGER.warning('test_get_kafka_configs_a requesting') response = KafkaProducerService.generate_kafka_configs( create_kafka_config_a('ip:port', 'ip:port', 'test_topic', 10, 3) ) LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') response = KafkaProducerServiceImpl.export_collector_value( create_collector_request('1') ) LOGGER.debug(str(response)) assert isinstance(response, str) No newline at end of file Loading
src/telemetry_frontend/backend/tests/test_kafka_backend.py 0 → 100644 +51 −0 Original line number Diff line number Diff line # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys # print (sys.path) import logging from .messagesBackend import create_kafka_config, create_kafka_config_a from src.telemetry_frontend.tests.Messages import create_collector_request from src.telemetry_frontend.backend.service.KafkaProducerService import KafkaProducerService from src.telemetry_frontend.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') response = KafkaProducerService.generate_kafka_configs( create_kafka_config() ) LOGGER.debug(str(response)) assert isinstance(response, dict) def test_get_kafka_configs_a(): LOGGER.warning('test_get_kafka_configs_a requesting') response = KafkaProducerService.generate_kafka_configs( create_kafka_config_a('ip:port', 'ip:port', 'test_topic', 10, 3) ) LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') response = KafkaProducerServiceImpl.export_collector_value( create_collector_request('1') ) LOGGER.debug(str(response)) assert isinstance(response, str) No newline at end of file