Skip to content
Snippets Groups Projects
test_backend.py 8.62 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import pytest
    
    import logging
    
    import pandas as pd
    from unittest.mock import MagicMock, patch
    
    from common.tools.kafka.Variables import KafkaTopic
    
    from analytics.backend.service.Streamer import DaskStreamer
    from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
                                        get_windows_size, get_batch_size, get_agg_df
    from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
    
    logger = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
    
    # --- "test_validate_kafka_topics" should be run before the functionality tests ---
    def test_validate_kafka_topics():
        logger.debug(" >>> test_validate_kafka_topics: START <<< ")
        response = KafkaTopic.create_all_topics()
        assert isinstance(response, bool)
    
    
    ###########################
    # Tests Implementation of Telemetry Backend
    ###########################
    
    
    @pytest.fixture(autouse=True)
    def log_all_methods(request):
        '''
        This fixture logs messages before and after each test function runs, indicating the start and end of the test.
        The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
        '''
        logger.info(f" >>> Starting test: {request.node.name} >>> ")
        yield
        logger.info(f" <<< Finished test: {request.node.name} <<< ")
    
    @pytest.fixture
    def dask_streamer():
        with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client')    as mock_dask_client, \
             patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \
             patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer:
            
            mock_dask_client.return_value = (MagicMock(), MagicMock())
            mock_kafka_consumer.return_value = MagicMock()
            mock_kafka_producer.return_value = MagicMock()
            
            return DaskStreamer(
                key="test_key",
                input_kpis=get_input_kpi_list(),
                output_kpis=get_output_kpi_list(),
                thresholds=get_thresholds(),
                batch_size=get_batch_size(),
                window_size=get_windows_size(),
                n_workers=3,
                threads_per_worker=1
            )
    
    def test_initialization(dask_streamer):
        """Test if the DaskStreamer initializes correctly."""
        assert dask_streamer.key == "test_key"
        assert dask_streamer.batch_size == get_batch_size()
        assert dask_streamer.window_size is None
        assert dask_streamer.n_workers == 3
        assert dask_streamer.consumer is not None
        assert dask_streamer.producer is not None
        assert dask_streamer.client is not None
        assert dask_streamer.cluster is not None
    
    
    def test_run_stops_on_no_consumer(dask_streamer):
        """Test if the run method exits when the consumer is not initialized."""
        dask_streamer.consumer = None
        with patch('time.sleep', return_value=None):
            dask_streamer.run()
        assert not dask_streamer.running
    
    def test_task_handler_selector_valid_handler(dask_streamer):
        """Test task handler selection with a valid handler."""
        with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \
             patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \
             patch.object(dask_streamer.client, 'status', 'running'):
    
            dask_streamer.task_handler_selector()
            mock_submit.assert_called_once()
    
    def test_task_handler_selector_invalid_handler(dask_streamer):
        """Test task handler selection with an invalid handler."""
        with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False):
            dask_streamer.task_handler_selector()
            assert dask_streamer.batch == []
    
    def test_produce_result(dask_streamer):
        """Test if produce_result sends records to Kafka."""
        result = [{"kpi_id": "kpi1", "value": 100}]
        with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
             patch.object(dask_streamer.producer, 'produce') as mock_produce:
            dask_streamer.produce_result(result, "test_topic")
            mock_produce.assert_called_once_with(
                "test_topic",
                key="kpi1",
                value='{"kpi_id": "kpi1", "value": 100}',
                callback=mock_delivery_report
            )
    
    def test_cleanup(dask_streamer):
        """Test the cleanup method."""
        with patch.object(dask_streamer.consumer, 'close')              as mock_consumer_close, \
             patch.object(dask_streamer.producer, 'flush')              as mock_producer_flush, \
             patch.object(dask_streamer.client,   'close')              as mock_client_close, \
             patch.object(dask_streamer.cluster,  'close', MagicMock()) as mock_cluster_close:
            
            # Mock the conditions required for the close calls
            dask_streamer.client.status = 'running'
            dask_streamer.cluster.close = MagicMock()
            
            dask_streamer.cleanup()
    
            mock_consumer_close.assert_called_once()
            mock_producer_flush.assert_called_once()
            mock_client_close.assert_called_once()
            dask_streamer.cluster.close.assert_called_once()
    
    def test_run_with_valid_consumer(dask_streamer):
        """Test the run method with a valid Kafka consumer."""
        with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
             patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
            
            # Simulate valid messages without errors
            mock_message_1 = MagicMock()
            mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
            mock_message_1.error.return_value = None  # No error
    
            mock_message_2 = MagicMock()
            mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
            mock_message_2.error.return_value = None  # No error
    
            # Mock `poll` to return valid messages
            mock_poll.side_effect = [mock_message_1, mock_message_2]
            
            # Run the `run` method in a limited loop
            with patch('time.sleep', return_value=None):  # Mock `sleep` to avoid delays
                dask_streamer.running = True  # Ensure the streamer runs
                dask_streamer.batch_size = 2  # Set a small batch size for the test
                
                # Limit the loop by breaking it after one full processing cycle
                def stop_running_after_task_handler(*args, **kwargs):
                    logger.info("Stopping the streamer after processing the first batch.")
                    dask_streamer.running = False
                
                mock_task_handler_selector.side_effect = stop_running_after_task_handler
                
                # Execute the method
                dask_streamer.run()
            
            # Assertions
            assert len(dask_streamer.batch) == 0  # Batch should be cleared after processing
            mock_task_handler_selector.assert_called_once()  # Task handler should be called once
            mock_poll.assert_any_call(timeout=2.0)  # Poll should have been called
    
    
    # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
    def test_aggregation_handler():
    
        # Create a sample batch
        batch = get_batch()
        input_kpi_list = get_input_kpi_list()
        output_kpi_list = get_output_kpi_list()
        thresholds = get_thresholds() 
    
        # Test aggregation_handler
        aggregated_df = aggregation_handler(
            "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
        )
        assert isinstance(aggregated_df, list)
        assert all(isinstance(item, dict) for item in aggregated_df)
    
    # Test threshold_handler
    def test_threshold_handler():
        # Create a sample aggregated DataFrame
        agg_df = get_agg_df()
        thresholds = get_thresholds()
    
        # Test threshold_handler
        result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
        assert isinstance(result, pd.DataFrame)
        assert result.shape == (1, 7)