diff --git a/scripts/run_tests_locally-device-morpheus.sh b/scripts/run_tests_locally-device-morpheus.sh new file mode 100755 index 0000000000000000000000000000000000000000..1af88264eb26a511646be46c98b91f517cbac5e1 --- /dev/null +++ b/scripts/run_tests_locally-device-morpheus.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc + +# Run unitary tests and analyze coverage of code at same time +coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ + device/tests/test_unitary_morpheus.py diff --git a/src/device/tests/test_unitary_morpheus.py b/src/device/tests/test_unitary_morpheus.py new file mode 100644 index 0000000000000000000000000000000000000000..cf754e091698bbe600ed34752e58ab4314d41a75 --- /dev/null +++ b/src/device/tests/test_unitary_morpheus.py @@ -0,0 +1,91 @@ +import os +os.environ['DEVICE_EMULATED_ONLY'] = 'YES' + +# pylint: disable=wrong-import-position +import json +import logging, pytest, time +from typing import Dict, List +from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver + +logging.basicConfig(level=logging.DEBUG) +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +##### DRIVER FIXTURE ################################################################################################### + +DRIVER_SETTING_ADDRESS = '127.0.0.1' +DRIVER_SETTING_PORT = 8090 + +@pytest.fixture(scope='session') +def driver() -> MorpheusApiDriver: + _driver = MorpheusApiDriver( + DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT, + ) + _driver.Connect() + yield _driver + time.sleep(1) + _driver.Disconnect() + + +##### TEST METHODS ##################################################################################################### + +def print_data(label, data): + print(f"{label}: {json.dumps(data, indent=2)}") + +def test_initial_config_retrieval(driver: MorpheusApiDriver): + config = driver.GetInitialConfig() + + assert isinstance(config, list), "Expected a list for initial config" + assert len(config) > 0, "Initial config should not be empty" + + print_data("Initial Config", config) + +def test_retrieve_config(driver: MorpheusApiDriver): + config = driver.GetConfig(None) + + assert isinstance(config, list), "Expected a list for config" + assert len(config) > 0, "Config should not be empty" + + print_data("Config", config) + +def test_set_config(driver: MorpheusApiDriver): + results = driver.SetConfig([('traffic_type', 'UDP')]) + + assert len(results) == 1, "Expected only one result" + assert results[0] is True, "Expected a succesfull result" + +def test_retrieve_state(driver: MorpheusApiDriver): + state = driver.GetState() + + assert isinstance(state, list), "Expected a a list for initial config" + assert len(state) > 0, " State should not be empty" + + print_data("State", state) + +def test_pipeline(driver: MorpheusApiDriver): + result = driver.StartPipeline() + + assert result is True + + result = driver.StopPipeline() + + assert result is True + +def test_subscription_detection(driver: MorpheusApiDriver): + result = driver.SubscribeDetectionEvent() + + assert result is True + + result = driver.UnsubscribeDetectionEvent() + + assert result is True + +def test_subscription_error(driver: MorpheusApiDriver): + result = driver.SubscribePipelineError() + + assert result is True + + result = driver.UnsubscribePipelineError() + + assert result is True