import copy, grpc, logging, operator, os, pytest from typing import Any, Dict, List, Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient from context.proto.context_pb2 import DeviceId from context.service.grpc_server.ContextService import ContextService from device.Config import ( GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.service.DeviceService import DeviceService from device.service.driver_api._Driver import _Driver from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' DEFAULT_REDIS_SERVICE_PORT = 6379 DEFAULT_REDIS_DATABASE_ID = 0 REDIS_CONFIG = { 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), } SCENARIOS = [ ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), ] @pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) def context_db_mb(request) -> Tuple[Database, MessageBroker]: name,db_backend,db_settings,mb_backend,mb_settings = request.param msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) _database = Database(get_database_backend(backend=db_backend, **db_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name _service = ContextService( context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, grace_period=CONTEXT_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = DeviceService( context_client, driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) yield _client _client.close() def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_device_add( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) with pytest.raises(grpc.RpcError) as e: device_client.AddDevice(Device(**DEVICE1)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'device.device_config.config_rules(['\ 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc1/value"\nresource_value: "value1"\n, '\ 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc2/value"\nresource_value: "value2"\n, '\ 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc3/value"\nresource_value: "value3"\n]) is invalid; '\ 'RPC method AddDevice does not allow definition of Config Rules. Add the Device first, and then configure it.' assert e.value.details() == msg DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1) DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear() device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID) # we know the driver exists now assert driver is not None initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE1_ID)) LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID)) LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) driver_config = driver.GetConfig() LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 0 device_client.ConfigureDevice(Device(**DEVICE1)) driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 3 assert driver_config[0] == ('/dev/rsrc1/value', 'value1') assert driver_config[1] == ('/dev/rsrc2/value', 'value2') assert driver_config[2] == ('/dev/rsrc3/value', 'value3') DEVICE1_WITH = copy.deepcopy(DEVICE1) CONFIG_RULES : List[Dict[str, Any]] = DEVICE1_WITH['device_config']['config_rules'] CONFIG_RULES.clear() CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_DELETE, 'dev/rsrc1/value', '')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc10/value', 'value10')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc11/value', 'value11')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12')) device_client.ConfigureDevice(Device(**DEVICE1_WITH)) driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 5 assert driver_config[0] == ('/dev/rsrc10/value', 'value10') assert driver_config[1] == ('/dev/rsrc11/value', 'value11') assert driver_config[2] == ('/dev/rsrc12/value', 'value12') assert driver_config[3] == ('/dev/rsrc2/value', 'value2') assert driver_config[4] == ('/dev/rsrc3/value', 'value3') device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID)) #LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( '\n'.join([ '{:s} {:s} = {:s}'.format( ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) for config_rule in device_data.device_config.config_rules ]))) device_client.DeleteDevice(DeviceId(**DEVICE1_ID)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID, {}) # we know the driver exists now assert driver is None raise Exception() #def test_add_device_wrong_attributes(device_client : DeviceClient): # # should fail with device uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_id']['device_id']['uuid'] = '' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'device.device_id.device_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with device type is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_type'] = '' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'device.device_type() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with wrong device operational status # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Method(AddDevice) does not accept OperationalStatus(KEEP_STATE). '\ # 'Permitted values for Method(AddDevice) are OperationalStatus([\'DISABLED\', \'ENABLED\']).' # assert e.value.details() == msg # #def test_add_device_wrong_endpoint(device_client : DeviceClient): # # should fail with unsupported endpoint context # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context' # request = Device(**copy_device) # device_client.AddDevice(request) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Context(wrong-context) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Contexts({\'admin\'}). '\ # 'Optionally, leave field empty to use predefined Context(admin).' # assert e.value.details() == msg # # # should fail with unsupported endpoint topology # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = 'wrong-topo' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Topologies({\'admin\'}). '\ # 'Optionally, leave field empty to use predefined Topology(admin).' # assert e.value.details() == msg # # # should fail with wrong endpoint device # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = 'wrong-device' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\ # 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Devices({\'DEV1\'}). '\ # 'Optionally, leave field empty to use predefined Device(DEV1).' # assert e.value.details() == msg # # # should fail with endpoint port uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_id']['port_id']['uuid'] = '' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with endpoint port type is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_type'] = '' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'endpoint[#0].port_type() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # # # should fail with duplicate port in device # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][1]['port_id']['port_id']['uuid'] = 'EP2' # device_client.AddDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1)/Port(EP2) in Endpoint(#1) of '\ # 'Context(admin)/Topology(admin)/Device(DEV1).' # assert e.value.details() == msg # #def test_add_device(device_client : DeviceClient): # # should work # validate_device_id(MessageToDict( # device_client.AddDevice(Device(**DEVICE)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_add_device_duplicate(device_client : DeviceClient): # # should fail with device already exists # with pytest.raises(grpc._channel._InactiveRpcError) as e: # device_client.AddDevice(Device(**DEVICE)) # assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS # msg = 'Context(admin)/Topology(admin)/Device(DEV1) already exists in the database.' # assert e.value.details() == msg # #def test_delete_device_empty_uuid(device_client : DeviceClient): # # should fail with device uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device_id = copy.deepcopy(DEVICE_ID) # copy_device_id['device_id']['uuid'] = '' # device_client.DeleteDevice(DeviceId(**copy_device_id)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'device_id.device_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # #def test_delete_device_not_found(device_client : DeviceClient): # # should fail with device not found # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device_id = copy.deepcopy(DEVICE_ID) # copy_device_id['device_id']['uuid'] = 'wrong-device-id' # device_client.DeleteDevice(DeviceId(**copy_device_id)) # assert e.value.code() == grpc.StatusCode.NOT_FOUND # msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.' # assert e.value.details() == msg # #def test_delete_device(device_client : DeviceClient): # # should work # validate_empty(MessageToDict( # device_client.DeleteDevice(DeviceId(**DEVICE_ID)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_configure_device_empty_device_uuid(device_client : DeviceClient): # # should fail with device uuid is empty # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_id']['device_id']['uuid'] = '' # device_client.ConfigureDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'device.device_id.device_id.uuid() is out of range: '\ # 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' # assert e.value.details() == msg # #def test_configure_device_not_found(device_client : DeviceClient): # # should fail with device not found # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_id']['device_id']['uuid'] = 'wrong-device-id' # device_client.ConfigureDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.NOT_FOUND # msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.' # assert e.value.details() == msg # #def test_add_device_default_endpoint_context_topology_device(device_client : DeviceClient): # # should work # copy_device = copy.deepcopy(DEVICE) # copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = '' # copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = '' # copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = '' # validate_device_id(MessageToDict( # device_client.AddDevice(Device(**copy_device)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False)) # #def test_configure_device_wrong_attributes(device_client : DeviceClient): # # should fail with device type is wrong # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_type'] = 'wrong-type' # device_client.ConfigureDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # msg = 'Device(DEV1) has Type(ROADM) in the database. Cannot be changed to Type(wrong-type).' # assert e.value.details() == msg # # # should fail with endpoints cannot be modified # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # device_client.ConfigureDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT # assert e.value.details() == 'Endpoints belonging to Device(DEV1) cannot be modified.' # # # should fail with any change detected # with pytest.raises(grpc._channel._InactiveRpcError) as e: # copy_device = copy.deepcopy(DEVICE) # copy_device['device_config']['device_config'] = '' # copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value # copy_device['endpointList'].clear() # device_client.ConfigureDevice(Device(**copy_device)) # assert e.value.code() == grpc.StatusCode.ABORTED # msg = 'Any change has been requested for Device(DEV1). '\ # 'Either specify a new configuration or a new device operational status.' # assert e.value.details() == msg # #def test_configure_device(device_client : DeviceClient): # # should work # copy_device = copy.deepcopy(DEVICE) # copy_device['device_config']['device_config'] = '' # copy_device['devOperationalStatus'] = OperationalStatus.DISABLED.value # copy_device['endpointList'].clear() # validate_device_id(MessageToDict( # device_client.ConfigureDevice(Device(**copy_device)), # including_default_value_fields=True, preserving_proto_field_name=True, # use_integers_for_enums=False))