Skip to content
test_unitary_morpheus.py 2.53 KiB
Newer Older
import os
os.environ['DEVICE_EMULATED_ONLY'] = 'YES'

# pylint: disable=wrong-import-position
import json
import logging, pytest, time
from typing import Dict, List
from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


##### DRIVER FIXTURE ###################################################################################################

DRIVER_SETTING_ADDRESS  = '127.0.0.1'
DRIVER_SETTING_PORT     = 8090

@pytest.fixture(scope='session')
def driver() -> MorpheusApiDriver:
    _driver = MorpheusApiDriver(
        DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT,
    )
    _driver.Connect()
    yield _driver
    time.sleep(1)
    _driver.Disconnect()


##### TEST METHODS #####################################################################################################

def print_data(label, data):
    print(f"{label}: {json.dumps(data, indent=2)}")

def test_initial_config_retrieval(driver: MorpheusApiDriver):
    config = driver.GetInitialConfig()

    assert isinstance(config, list), "Expected a list for initial config"
    assert len(config) > 0, "Initial config should not be empty"

    print_data("Initial Config", config)

def test_retrieve_config(driver: MorpheusApiDriver):
    config = driver.GetConfig(None)

    assert isinstance(config, list), "Expected a list for config"
    assert len(config) > 0, "Config should not be empty"

    print_data("Config", config)

def test_set_config(driver: MorpheusApiDriver):
    results = driver.SetConfig([('traffic_type', 'UDP')])

    assert len(results) == 1, "Expected only one result"
    assert results[0] is True, "Expected a succesfull result"

def test_retrieve_state(driver: MorpheusApiDriver):
    state = driver.GetState()

    assert isinstance(state, list), "Expected a a list for initial config"
    assert len(state) > 0, " State should not be empty"

    print_data("State", state)

def test_pipeline(driver: MorpheusApiDriver):
    result = driver.StartPipeline()

    assert result is True

    result = driver.StopPipeline()

    assert result is True

def test_subscription_detection(driver: MorpheusApiDriver):
    result = driver.SubscribeDetectionEvent()

    assert result is True

    result = driver.UnsubscribeDetectionEvent()

    assert result is True

def test_subscription_error(driver: MorpheusApiDriver):
    result = driver.SubscribePipelineError()

    assert result is True

    result = driver.UnsubscribePipelineError()

    assert result is True