Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
get_windows_size, get_batch_size, get_agg_df, get_duration
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
# ----
# Test fixtures and helper functions
# ----
@pytest.fixture(autouse=True)
def log_all_methods(request):
'''
This fixture logs messages before and after each test function runs, indicating the start and end of the test.
The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
'''
logger.info(f" >>>>> Starting test: {request.node.name} ")
logger.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def mock_kafka_producer():
mock_producer = MagicMock()
mock_producer.produce = MagicMock()
mock_producer.flush = MagicMock()
return mock_producer
@pytest.fixture
def mock_dask_cluster():
mock_cluster = MagicMock()
mock_cluster.close = MagicMock()
return mock_cluster
@pytest.fixture
def mock_dask_client():
mock_client = MagicMock()
mock_client.status = 'running'
mock_client.submit = MagicMock()
return mock_client
@pytest.fixture()
def mock_kafka_consumer():
mock_consumer = MagicMock()
mock_consumer.subscribe = MagicMock()
return mock_consumer
@pytest.fixture()
def mock_streamer_start():
mock_streamer = MagicMock()
mock_streamer.start = MagicMock()
return mock_streamer
###########################
# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
###########################
@pytest.fixture
def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
patch('analytics.backend.service.Streamer.DaskStreamer.run', return_value = mock_streamer_start):
service = AnalyticsBackendService()
yield service
service.close()
@pytest.fixture
def analyzer_data():
return {
'algo_name' : 'test_algorithm',
'oper_mode' : 'test_mode',
'input_kpis' : get_input_kpi_list(),
'output_kpis' : get_output_kpi_list(),
'thresholds' : get_thresholds(),
'duration' : get_duration(),
'batch_size_min' : get_batch_size(),
'window_size' : get_windows_size(),
'batch_duration_min' : get_duration(),
}
def test_start_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer
result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert result is True
assert analyzer_uuid in analytics_service.active_streamers
def test_stop_streamer(analytics_service, analyzer_data):
analyzer_uuid = "test-analyzer-uuid"
# Start streamer for stopping it later
analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
assert analyzer_uuid in analytics_service.active_streamers
with patch('time.sleep', return_value=None):
result = analytics_service.StopStreamer(analyzer_uuid)
assert result is True
assert analyzer_uuid not in analytics_service.active_streamers
# Verify that the streamer was stopped
assert analyzer_uuid not in analytics_service.active_streamers
def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
analytics_service.close()
mock_kafka_producer.flush.assert_called_once()
mock_dask_cluster.close.assert_called_once()
###########################
# funtionality pytest with specific fixtures for streamer class sub methods
###########################
def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster', return_value = mock_dask_cluster ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client ), \
patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
key = "test_key",
input_kpis = get_input_kpi_list(),
output_kpis = get_output_kpi_list(),
thresholds = get_thresholds(),
batch_size = get_batch_size(),
window_size = get_windows_size(),
cluster_instance = mock_dask_cluster(),
producer_instance = mock_kafka_producer(),
def test_dask_streamer_initialization(dask_streamer):
"""Test if the DaskStreamer initializes correctly."""
assert dask_streamer.key == "test_key"
assert dask_streamer.batch_size == get_batch_size()
assert dask_streamer.window_size is None
assert dask_streamer.consumer is not None
assert dask_streamer.producer is not None
assert dask_streamer.client is not None
def test_run_stops_on_no_consumer(dask_streamer):
"""Test if the run method exits when the consumer is not initialized."""
dask_streamer.consumer = None
with patch('time.sleep', return_value=None):
dask_streamer.run()
assert not dask_streamer.running
def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
"""Test task handler selection with a valid handler."""
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):
dask_streamer.task_handler_selector()
assert dask_streamer.client.status == 'running'
def test_task_handler_selector_invalid_handler(dask_streamer):
"""Test task handler selection with an invalid handler."""
with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
dask_streamer.task_handler_selector()
assert dask_streamer.batch == []
def test_produce_result(dask_streamer):
"""Test if produce_result sends records to Kafka."""
result = [{"kpi_id": "kpi1", "value": 100}]
with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
patch.object(dask_streamer.producer, 'produce') as mock_produce:
dask_streamer.produce_result(result, "test_topic")
mock_produce.assert_called_once_with(
"test_topic",
key="kpi1",
value=json.dumps({"kpi_id": "kpi1", "value": 100}),
callback=mock_delivery_report
)
def test_stop(dask_streamer):
with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
patch.object(dask_streamer.client, 'close') as mock_client_close, \
patch('time.sleep', return_value=0):
# Mock the conditions required for the close calls
dask_streamer.client.status = 'running'
dask_streamer.stop()
mock_consumer_close.assert_called_once()
mock_client_close.assert_called_once()
def test_run_with_valid_consumer(dask_streamer):
"""Test the run method with a valid Kafka consumer."""
with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
# Simulate valid messages without errors
mock_message_1 = MagicMock()
mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
mock_message_1.error.return_value = None # No error
mock_message_2 = MagicMock()
mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
mock_message_2.error.return_value = None # No error
# Mock `poll` to return valid messages
mock_poll.side_effect = [mock_message_1, mock_message_2]
# Run the `run` method in a limited loop
with patch('time.sleep', return_value=None): # Mock `sleep` to avoid delays
dask_streamer.running = True
dask_streamer.batch_size = 2
# Limit the loop by breaking it after one full processing cycle
def stop_running_after_task_handler():
logger.info("Stopping the streamer after processing the first batch.")
dask_streamer.running = False
mock_task_handler_selector.side_effect = stop_running_after_task_handler
dask_streamer.run()
assert len(dask_streamer.batch) == 0 # Batch should be cleared after processing
mock_task_handler_selector.assert_called_once() # Task handler should be called once
mock_poll.assert_any_call(timeout=1.0) # Poll should have been called at least once
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():
# Create a sample batch
batch = get_batch()
input_kpi_list = get_input_kpi_list()
output_kpi_list = get_output_kpi_list()
thresholds = get_thresholds()
# Test aggregation_handler
aggregated_df = aggregation_handler(
"test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
)
assert isinstance(aggregated_df, list)
assert all(isinstance(item, dict) for item in aggregated_df)
# # Test threshold_handler
def test_threshold_handler():
# Create a sample aggregated DataFrame
agg_df = get_agg_df()
# Test threshold_handler
result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
assert isinstance(result, pd.DataFrame)
assert result.shape == (1, 7)
###########################
# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
###########################
# This is a local machine test to check the integration of the backend service with the Streamer
# @pytest.fixture(scope='session')
# def analyticBackend_service():
# logger.info('Initializing AnalyticsBackendService...')
# _service = AnalyticsBackendService()
# _service.start()
# logger.info('Yielding AnalyticsBackendService...')
# yield _service
# logger.info('Terminating AnalyticsBackendService...')
# _service.stop()
# logger.info('Terminated AnalyticsBackendService...')
# # --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# logger.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# def test_backend_integration_with_frontend(analyticBackend_service: AnalyticsBackendService):
# # backendServiceObject = AnalyticsBackendService()
# # backendServiceObject.install_servicers()
# logger.info(" waiting for 2 minutes for the backend service before termination ... ")
# logger.info(" Initiating stop collector ... ")
# status = analyticBackend_service.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
# analyticBackend_service.close()
# assert isinstance(status, bool)
# assert status == True
# logger.info(" Backend service terminated successfully ... ")