import copy, grpc, logging, operator, os, pytest from typing import Any, Dict, List, Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient from context.proto.context_pb2 import DeviceId from context.service.grpc_server.ContextService import ContextService from device.Config import ( GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.service.DeviceService import DeviceService from device.service.MonitoringLoops import MonitoringLoops from device.service.driver_api._Driver import _Driver from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS from monitoring.client.monitoring_client import MonitoringClient from .example_objects import ( CONTEXT, TOPOLOGY, config_rule, DEVICE_EMU, DEVICE_EMU_CONFIG_RULES, DEVICE_EMU_CONNECT_RULES, DEVICE_EMU_ID, DEVICE_EMU_UUID, DEVICE_INF, DEVICE_INF_CONFIG_RULES, DEVICE_INF_CONNECT_RULES, DEVICE_INF_ID, DEVICE_INF_UUID) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' DEFAULT_REDIS_SERVICE_PORT = 6379 DEFAULT_REDIS_DATABASE_ID = 0 REDIS_CONFIG = { 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), } SCENARIOS = [ ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), ] @pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) def context_db_mb(request) -> Tuple[Database, MessageBroker]: name,db_backend,db_settings,mb_backend,mb_settings = request.param msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) _database = Database(get_database_backend(backend=db_backend, **db_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name _service = ContextService( context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, grace_period=CONTEXT_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name _driver_factory = DriverFactory(DRIVERS) _driver_instance_cache = DriverInstanceCache(_driver_factory) _monitoring_loops = MonitoringLoops(None) # TODO: replace by monitoring client _monitoring_loops.start() _service = DeviceService( context_client, _driver_instance_cache, _monitoring_loops, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _monitoring_loops.stop() _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) yield _client _client.close() def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_prepare_environment( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) def test_device_add_emulated_error_cases( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name with pytest.raises(grpc.RpcError) as e: DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU) DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_RULES) device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'device.device_config.config_rules([' msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ 'with "_connect/" tag. Others should be configured after adding the device.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) def test_device_add_emulated_correct( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name DEVICE_EMU_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU) DEVICE_EMU_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) device_client.AddDevice(Device(**DEVICE_EMU_WITH_CONNECT_RULES)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now assert driver is not None def test_device_get_emulated( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID)) LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) def test_device_configure_emulated( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now assert driver is not None driver_config = driver.GetConfig() LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 0 DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_RULES) device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 3 assert driver_config[0] == ('/dev/rsrc1/value', 'value1') assert driver_config[1] == ('/dev/rsrc2/value', 'value2') assert driver_config[2] == ('/dev/rsrc3/value', 'value3') DEVICE_EMU_WITH_CONFIG_RULES_2 = copy.deepcopy(DEVICE_EMU) DEVICE_EMU_WITH_CONFIG_RULES_2['device_config']['config_rules'].extend([ config_rule(ConfigActionEnum.CONFIGACTION_DELETE, 'dev/rsrc1/value', ''), config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc10/value', 'value10'), config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc11/value', 'value11'), config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12'), ]) device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES_2)) driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 5 assert driver_config[0] == ('/dev/rsrc10/value', 'value10') assert driver_config[1] == ('/dev/rsrc11/value', 'value11') assert driver_config[2] == ('/dev/rsrc12/value', 'value12') assert driver_config[3] == ('/dev/rsrc2/value', 'value2') assert driver_config[4] == ('/dev/rsrc3/value', 'value3') device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( '\n'.join([ '{:s} {:s} = {:s}'.format( ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) for config_rule in device_data.device_config.config_rules ]))) def test_device_delete_emulated( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name device_client.DeleteDevice(DeviceId(**DEVICE_EMU_ID)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID, {}) assert driver is None def test_device_add_openconfig_error_cases( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name with pytest.raises(grpc.RpcError) as e: DEVICE_INF_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_INF) DEVICE_INF_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_INF_CONNECT_RULES) DEVICE_INF_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_INF_CONFIG_RULES) device_client.AddDevice(Device(**DEVICE_INF_WITH_EXTRA_RULES)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'device.device_config.config_rules([' msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ 'with "_connect/" tag. Others should be configured after adding the device.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) def test_device_add_openconfig_correct( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name DEVICE_INF_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_INF) DEVICE_INF_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_INF_CONNECT_RULES) device_client.AddDevice(Device(**DEVICE_INF_WITH_CONNECT_RULES)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE_INF_UUID) # we know the driver exists now assert driver is not None def test_device_get_openconfig( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_INF_ID)) LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) device_data = context_client.GetDevice(DeviceId(**DEVICE_INF_ID)) LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) def test_device_configure_openconfig( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name driver : _Driver = device_service.driver_instance_cache.get(DEVICE_INF_UUID) # we know the driver exists now assert driver is not None raise NotImplementedError() #driver_config = driver.GetConfig() #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) #assert len(driver_config) == 0 #DEVICE_INF_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_INF) #DEVICE_INF_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_INF_CONFIG_RULES) #device_client.ConfigureDevice(Device(**DEVICE_INF_WITH_CONFIG_RULES)) #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) #assert len(driver_config) == 5 #assert driver_config[0] == ('/dev/rsrc10/value', 'value10') #assert driver_config[1] == ('/dev/rsrc11/value', 'value11') #assert driver_config[2] == ('/dev/rsrc12/value', 'value12') #assert driver_config[3] == ('/dev/rsrc2/value', 'value2') #assert driver_config[4] == ('/dev/rsrc3/value', 'value3') #device_data = context_client.GetDevice(DeviceId(**DEVICE_INF_ID)) #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( # '\n'.join([ # '{:s} {:s} = {:s}'.format( # ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) # for config_rule in device_data.device_config.config_rules # ]))) def test_device_delete_openconfig( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name device_client.DeleteDevice(DeviceId(**DEVICE_INF_ID)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE_INF_UUID, {}) assert driver is None