Skip to content
Snippets Groups Projects
test_backend.py 9.22 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import logging
import pandas as pd
from unittest.mock import MagicMock, patch
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.Streamer import DaskStreamer
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
                                    get_windows_size, get_batch_size, get_agg_df
from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
@pytest.fixture(autouse=True)
def log_all_methods(request):
    '''
    This fixture logs messages before and after each test function runs, indicating the start and end of the test.
    The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
    '''
    logger.info(f" >>> Starting test: {request.node.name} >>> ")
    yield
    logger.info(f" <<< Finished test: {request.node.name} <<< ")


# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
    logger.debug(" >>> test_validate_kafka_topics: START <<< ")
    response = KafkaTopic.create_all_topics()
    assert isinstance(response, bool)

###########################
# integration test of Streamer with backend service
###########################

def test_backend_integration_with_analyzer():
    backendServiceObject = AnalyticsBackendService()
    backendServiceObject.install_servicers()
    logger.info(" waiting for 2 minutes for the backend service before termination  ... ")
    time.sleep(120)
    backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
    logger.info(" Backend service terminated successfully ... ")


###########################
# funtionality pytest for analyzer sub methods
###########################

@pytest.fixture
def dask_streamer():
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client')    as mock_dask_client, \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer') as mock_kafka_consumer, \
         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer') as mock_kafka_producer:
        
        mock_dask_client.return_value = (MagicMock(), MagicMock())
        mock_kafka_consumer.return_value = MagicMock()
        mock_kafka_producer.return_value = MagicMock()
        
        return DaskStreamer(
            key="test_key",
            input_kpis=get_input_kpi_list(),
            output_kpis=get_output_kpi_list(),
            thresholds=get_thresholds(),
            batch_size=get_batch_size(),
            window_size=get_windows_size(),
            n_workers=3,
            threads_per_worker=1
        )

def test_initialization(dask_streamer):
    """Test if the DaskStreamer initializes correctly."""
    assert dask_streamer.key == "test_key"
    assert dask_streamer.batch_size == get_batch_size()
    assert dask_streamer.window_size is None
    assert dask_streamer.n_workers == 3
    assert dask_streamer.consumer is not None
    assert dask_streamer.producer is not None
    assert dask_streamer.client is not None
    assert dask_streamer.cluster is not None


def test_run_stops_on_no_consumer(dask_streamer):
    """Test if the run method exits when the consumer is not initialized."""
    dask_streamer.consumer = None
    with patch('time.sleep', return_value=None):
        dask_streamer.run()
    assert not dask_streamer.running

def test_task_handler_selector_valid_handler(dask_streamer):
    """Test task handler selection with a valid handler."""
    with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=True), \
         patch.object(dask_streamer.client, 'submit', return_value=MagicMock()) as mock_submit, \
         patch.object(dask_streamer.client, 'status', 'running'):

        dask_streamer.task_handler_selector()
        mock_submit.assert_called_once()

def test_task_handler_selector_invalid_handler(dask_streamer):
    """Test task handler selection with an invalid handler."""
    with patch('analytics.backend.service.AnalyzerHandlers.AnalyzerHandlers.is_valid_handler', return_value=False):
        dask_streamer.task_handler_selector()
        assert dask_streamer.batch == []

def test_produce_result(dask_streamer):
    """Test if produce_result sends records to Kafka."""
    result = [{"kpi_id": "kpi1", "value": 100}]
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
         patch.object(dask_streamer.producer, 'produce') as mock_produce:
        dask_streamer.produce_result(result, "test_topic")
        mock_produce.assert_called_once_with(
            "test_topic",
            key="kpi1",
            value='{"kpi_id": "kpi1", "value": 100}',
            callback=mock_delivery_report
        )

def test_cleanup(dask_streamer):
    """Test the cleanup method."""
    with patch.object(dask_streamer.consumer, 'close')              as mock_consumer_close, \
         patch.object(dask_streamer.producer, 'flush')              as mock_producer_flush, \
         patch.object(dask_streamer.client,   'close')              as mock_client_close, \
         patch.object(dask_streamer.cluster,  'close', MagicMock()) as mock_cluster_close:
        
        # Mock the conditions required for the close calls
        dask_streamer.client.status = 'running'
        dask_streamer.cluster.close = MagicMock()
        
        dask_streamer.cleanup()

        mock_consumer_close.assert_called_once()
        mock_producer_flush.assert_called_once()
        mock_client_close.assert_called_once()
        dask_streamer.cluster.close.assert_called_once()

def test_run_with_valid_consumer(dask_streamer):
    """Test the run method with a valid Kafka consumer."""
    with patch.object(dask_streamer.consumer, 'poll') as mock_poll, \
         patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
        
        # Simulate valid messages without errors
        mock_message_1 = MagicMock()
        mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
        mock_message_1.error.return_value = None  # No error

        mock_message_2 = MagicMock()
        mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
        mock_message_2.error.return_value = None  # No error

        # Mock `poll` to return valid messages
        mock_poll.side_effect = [mock_message_1, mock_message_2]
        
        # Run the `run` method in a limited loop
        with patch('time.sleep', return_value=None):  # Mock `sleep` to avoid delays
            dask_streamer.running = True  # Ensure the streamer runs
            dask_streamer.batch_size = 2  # Set a small batch size for the test
            
            # Limit the loop by breaking it after one full processing cycle
            def stop_running_after_task_handler(*args, **kwargs):
                logger.info("Stopping the streamer after processing the first batch.")
                dask_streamer.running = False
            
            mock_task_handler_selector.side_effect = stop_running_after_task_handler
            
            # Execute the method
            dask_streamer.run()
        
        # Assertions
        assert len(dask_streamer.batch) == 0  # Batch should be cleared after processing
        mock_task_handler_selector.assert_called_once()  # Task handler should be called once
        mock_poll.assert_any_call(timeout=2.0)  # Poll should have been called


# add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
def test_aggregation_handler():

    # Create a sample batch
    batch = get_batch()
    input_kpi_list = get_input_kpi_list()
    output_kpi_list = get_output_kpi_list()
    thresholds = get_thresholds() 

    # Test aggregation_handler
    aggregated_df = aggregation_handler(
        "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
    )
    assert isinstance(aggregated_df, list)
    assert all(isinstance(item, dict) for item in aggregated_df)
# Test threshold_handler
def test_threshold_handler():
    # Create a sample aggregated DataFrame
    agg_df = get_agg_df()
    thresholds = get_thresholds()
    # Test threshold_handler
    result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
    assert isinstance(result, pd.DataFrame)
    assert result.shape == (1, 7)