import os os.environ['DEVICE_EMULATED_ONLY'] = 'YES' # pylint: disable=wrong-import-position import json import logging, pytest, time from typing import Dict, List from device.service.drivers.morpheus.MorpheusApiDriver import MorpheusApiDriver logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ##### DRIVER FIXTURE ################################################################################################### DRIVER_SETTING_ADDRESS = '127.0.0.1' DRIVER_SETTING_PORT = 8090 @pytest.fixture(scope='session') def driver() -> MorpheusApiDriver: _driver = MorpheusApiDriver( DRIVER_SETTING_ADDRESS, DRIVER_SETTING_PORT, ) _driver.Connect() yield _driver time.sleep(1) _driver.Disconnect() ##### TEST METHODS ##################################################################################################### def print_data(label, data): print(f"{label}: {json.dumps(data, indent=2)}") def test_initial_config_retrieval(driver: MorpheusApiDriver): config = driver.GetInitialConfig() assert isinstance(config, list), "Expected a list for initial config" assert len(config) > 0, "Initial config should not be empty" print_data("Initial Config", config) def test_retrieve_config(driver: MorpheusApiDriver): config = driver.GetConfig(None) assert isinstance(config, list), "Expected a list for config" assert len(config) > 0, "Config should not be empty" print_data("Config", config) def test_set_config(driver: MorpheusApiDriver): results = driver.SetConfig([('traffic_type', 'UDP')]) assert len(results) == 1, "Expected only one result" assert results[0] is True, "Expected a succesfull result" def test_retrieve_state(driver: MorpheusApiDriver): state = driver.GetState() assert isinstance(state, list), "Expected a a list for initial config" assert len(state) > 0, " State should not be empty" print_data("State", state) def test_pipeline(driver: MorpheusApiDriver): result = driver.StartPipeline() assert result is True result = driver.StopPipeline() assert result is True def test_subscription_detection(driver: MorpheusApiDriver): result = driver.SubscribeDetectionEvent() assert result is True result = driver.UnsubscribeDetectionEvent() assert result is True def test_subscription_error(driver: MorpheusApiDriver): result = driver.SubscribePipelineError() assert result is True result = driver.UnsubscribePipelineError() assert result is True