Skip to content
Snippets Groups Projects
test_backend.py 12.5 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import logging
import pandas as pd

from unittest.mock      import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
                               get_windows_size, get_batch_size, get_agg_df, get_duration

from common.tools.kafka.Variables                      import KafkaTopic
from analytics.backend.service.Streamer                import DaskStreamer
from analytics.backend.service.AnalyzerHandlers        import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
# ----
# Test fixtures and helper functions
# ----

@pytest.fixture(autouse=True)
def log_all_methods(request):
    '''
    This fixture logs messages before and after each test function runs, indicating the start and end of the test.
    The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
    '''
    logger.info(f" >>>>> Starting test: {request.node.name} ")
    logger.info(f" <<<<< Finished test: {request.node.name} ")
@pytest.fixture
def mock_kafka_producer():
    mock_producer         = MagicMock()
    mock_producer.produce = MagicMock()
    mock_producer.flush   = MagicMock()
    return mock_producer
@pytest.fixture
def mock_dask_cluster():
    mock_cluster       = MagicMock()
    mock_cluster.close = MagicMock()
    return mock_cluster

@pytest.fixture
def mock_dask_client():
    mock_client        = MagicMock()
    mock_client.status = 'running'
    mock_client.submit = MagicMock()
    return mock_client

@pytest.fixture()
def mock_kafka_consumer():
    mock_consumer           = MagicMock()
    mock_consumer.subscribe = MagicMock()
    return mock_consumer

@pytest.fixture()
def mock_streamer_start():
    mock_streamer = MagicMock()
    mock_streamer.start = MagicMock()
    return mock_streamer
# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
@pytest.fixture
def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster',   return_value = mock_dask_cluster  ), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client',    return_value = mock_dask_client   ), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
         patch('analytics.backend.service.Streamer.DaskStreamer.run',                               return_value = mock_streamer_start):
         
        service = AnalyticsBackendService()
        yield service
        service.close()

@pytest.fixture
def analyzer_data():
    return {
        'algo_name'  : 'test_algorithm',
        'oper_mode'  : 'test_mode',
        'input_kpis' : get_input_kpi_list(),
        'output_kpis': get_output_kpi_list(),
        'thresholds' : get_thresholds(),
        'batch_size' : get_batch_size(),
        'window_size': get_windows_size(),
        'duration'   : get_duration(),
    }

def test_start_streamer(analytics_service, analyzer_data):
    analyzer_uuid = "test-analyzer-uuid"
    # Start streamer
    result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
    assert result is True
    assert analyzer_uuid in analytics_service.active_streamers

def test_stop_streamer(analytics_service, analyzer_data):
    analyzer_uuid = "test-analyzer-uuid"

    # Start streamer for stopping it later
    analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
    assert analyzer_uuid in analytics_service.active_streamers
    # Stop streamer
    result = analytics_service.StopStreamer(analyzer_uuid)
    assert result is True
    assert analyzer_uuid not in analytics_service.active_streamers

    # Verify that the streamer was stopped
    assert analyzer_uuid not in analytics_service.active_streamers

def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
    analytics_service.close()

    mock_kafka_producer.flush.assert_called_once()
    mock_dask_cluster.close.assert_called_once()
# funtionality pytest with specific fixtures for streamer class sub methods
@pytest.fixture
def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster',   return_value = mock_dask_cluster  ), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client',    return_value = mock_dask_client   ), \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
        
        return DaskStreamer(
            key               = "test_key",
            input_kpis        = get_input_kpi_list(),
            output_kpis       = get_output_kpi_list(),
            thresholds        = get_thresholds(),
            batch_size        = get_batch_size(),
            window_size       = get_windows_size(),
            cluster_instance  = mock_dask_cluster(),
            producer_instance = mock_kafka_producer(),
def test_dask_streamer_initialization(dask_streamer):
    """Test if the DaskStreamer initializes correctly."""
    assert dask_streamer.key         == "test_key"
    assert dask_streamer.batch_size  == get_batch_size()
    assert dask_streamer.window_size is None
    assert dask_streamer.consumer    is not None
    assert dask_streamer.producer    is not None
    assert dask_streamer.client      is not None

def test_run_stops_on_no_consumer(dask_streamer):
    """Test if the run method exits when the consumer is not initialized."""
    dask_streamer.consumer = None
    with patch('time.sleep', return_value=None):
        dask_streamer.run()
    assert not dask_streamer.running

def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
    """Test task handler selection with a valid handler."""
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):

        dask_streamer.task_handler_selector()
        assert dask_streamer.client.status == 'running'

def test_task_handler_selector_invalid_handler(dask_streamer):
    """Test task handler selection with an invalid handler."""
    with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
        dask_streamer.task_handler_selector()
        assert dask_streamer.batch == []

def test_produce_result(dask_streamer):
    """Test if produce_result sends records to Kafka."""
    result = [{"kpi_id": "kpi1", "value": 100}]
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
         patch.object(dask_streamer.producer, 'produce') as mock_produce:
        dask_streamer.produce_result(result, "test_topic")
        mock_produce.assert_called_once_with(
            "test_topic",
            key="kpi1",
            value=json.dumps({"kpi_id": "kpi1", "value": 100}),
            callback=mock_delivery_report
        )

def test_stop(dask_streamer):
    """Test the cleanup method."""
    with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
         patch.object(dask_streamer.client,   'close') as mock_client_close, \
         patch('time.sleep', return_value=0):
        
        # Mock the conditions required for the close calls
        dask_streamer.client.status = 'running'
        

        mock_consumer_close.assert_called_once()
        mock_client_close.assert_called_once()

def test_run_with_valid_consumer(dask_streamer):
    """Test the run method with a valid Kafka consumer."""
    with patch.object(dask_streamer.consumer, 'poll')         as mock_poll, \
         patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
        
        # Simulate valid messages without errors
        mock_message_1 = MagicMock()
        mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
        mock_message_1.error.return_value = None  # No error

        mock_message_2 = MagicMock()
        mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
        mock_message_2.error.return_value = None  # No error

        # Mock `poll` to return valid messages
        mock_poll.side_effect = [mock_message_1, mock_message_2]
        
        # Run the `run` method in a limited loop
        with patch('time.sleep', return_value=None):  # Mock `sleep` to avoid delays
            dask_streamer.running = True            
            dask_streamer.batch_size = 2            
            
            # Limit the loop by breaking it after one full processing cycle
            def stop_running_after_task_handler():
                logger.info("Stopping the streamer after processing the first batch.")
                dask_streamer.running = False
            
            mock_task_handler_selector.side_effect = stop_running_after_task_handler
            dask_streamer.run()
        
        assert len(dask_streamer.batch) == 0  # Batch should be cleared after processing
        mock_task_handler_selector.assert_called_once()  # Task handler should be called once
        mock_poll.assert_any_call(timeout=2.0)  # Poll should have been called at least once
# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():

    # Create a sample batch
    batch           = get_batch()
    input_kpi_list  = get_input_kpi_list()
    output_kpi_list = get_output_kpi_list()
    thresholds      = get_thresholds() 

    # Test aggregation_handler
    aggregated_df = aggregation_handler(
        "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
    )
    assert isinstance(aggregated_df, list)
    assert all(isinstance(item, dict) for item in aggregated_df)
# # Test threshold_handler
def test_threshold_handler():
    # Create a sample aggregated DataFrame
    agg_df     = get_agg_df()
    thresholds = get_thresholds()
    # Test threshold_handler
    result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
    assert isinstance(result, pd.DataFrame)
    assert result.shape == (1, 7)


###########################
# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
###########################
# This is a local machine test to check the integration of the backend service with the Streamer

# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
#     logger.debug(" >>> test_validate_kafka_topics: START <<< ")
#     response = KafkaTopic.create_all_topics()
#     assert isinstance(response, bool)

# def test_backend_integration_with_analyzer():
#     backendServiceObject = AnalyticsBackendService()
#     backendServiceObject.install_servicers()
#     logger.info(" waiting for 2 minutes for the backend service before termination  ... ")
#     time.sleep(150)
#     logger.info(" Initiating stop collector ... ")
#     status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
#     backendServiceObject.close()
#     assert isinstance(status, bool)
#     assert status == True
#     logger.info(" Backend service terminated successfully ... ")