Newer
Older
import copy, grpc, logging, pytest
from google.protobuf.json_format import MessageToDict
from common.database.Factory import get_database, DatabaseEngineEnum
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus
from common.tests.Assertions import validate_device_id, validate_empty
from device.client.DeviceClient import DeviceClient
from device.proto.context_pb2 import Device, DeviceId
from device.service.DeviceService import DeviceService
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD

Lluis Gifre Renom
committed
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
'device_config': {'device_config': '<config/>'},
'devOperationalStatus': OperationalStatus.ENABLED.value,
'endpointList' : [
{
'port_id': {
'topoId': {
'contextId': {'contextUuid': {'uuid': 'admin'}},
'topoId': {'uuid': 'admin'}
},
'dev_id': {'device_id': {'uuid': 'DEV1'}},
'port_id': {'uuid' : 'EP2'}
},
{
'port_id': {
'topoId': {
'contextId': {'contextUuid': {'uuid': 'admin'}},
'topoId': {'uuid': 'admin'}
},
'dev_id': {'device_id': {'uuid': 'DEV1'}},
'port_id': {'uuid' : 'EP3'}
'port_type': 'WDM'
},
{
'port_id': {
'topoId': {
'contextId': {'contextUuid': {'uuid': 'admin'}},
'topoId': {'uuid': 'admin'}
},
'dev_id': {'device_id': {'uuid': 'DEV1'}},
'port_id': {'uuid' : 'EP4'}

Lluis Gifre Renom
committed
@pytest.fixture(scope='session')
def device_database():
_database = get_database(engine=DatabaseEngineEnum.INMEMORY)
return _database
@pytest.fixture(scope='session')
def device_service(device_database):
_service = DeviceService(
device_database, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)

Lluis Gifre Renom
committed
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
_client = DeviceClient(address='127.0.0.1', port=GRPC_SERVICE_PORT)
def test_add_device_empty_device_uuid(device_client : DeviceClient):
# should fail with device uuid is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['device_id']['device_id']['uuid'] = ''
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'device.device_id.device_id.uuid() string is empty.'
def test_add_device_empty_device_type(device_client : DeviceClient):
# should fail with device type is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['device_type'] = ''
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'device.device_type() string is empty.'
def test_add_device_wrong_device_operational_status(device_client : DeviceClient):
# should fail with wrong device operational status
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = ' '.join([
'Device has to be created with either ENABLED/DISABLED Operational State.',
'Use KEEP_STATE only in configure Device methods.',
])
assert e.value.details() == msg
def test_add_device_endpoint_wrong_context(device_client : DeviceClient):
# should fail with unsupported context
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context'
request = Device(**copy_device)
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = ' '.join([
'Unsupported Context(wrong-context) in Endpoint(#0) of Device(DEV1).',
'Only default Context(admin) is currently supported.',
'Optionally, leave field empty to use default Context.',
def test_add_device_endpoint_wrong_topology(device_client : DeviceClient):
# should fail with unsupported topology
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = 'wrong-topo'
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = ' '.join([
'Unsupported Topology(wrong-topo) in Endpoint(#0) of Device(DEV1).',
'Only default Topology(admin) is currently supported.',
'Optionally, leave field empty to use default Topology.',
def test_add_device_endpoint_wrong_device(device_client : DeviceClient):
# should fail with wrong endpoint device
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = 'wrong-device'
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = ' '.join([
'Optionally, leave field empty to use parent Device.',
def test_add_device_empty_port_uuid(device_client : DeviceClient):
# should fail with port uuid is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_id']['port_id']['uuid'] = ''
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'endpoint[#0].port_id.port_id.uuid() string is empty.'
def test_add_device_empty_port_type(device_client : DeviceClient):
# should fail with port type is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_type'] = ''
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'endpoint[#0].port_type() string is empty.'
def test_add_device_duplicate_port(device_client : DeviceClient):
# should fail with uplicate port in device
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][1]['port_id']['port_id']['uuid'] = 'EP2'
assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
assert e.value.details() == 'Duplicated Port(EP2) in Endpoint(#1) of Device(DEV1).'
# should work
validate_device_id(MessageToDict(
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_add_device_duplicate(device_client : DeviceClient):
# should fail with device already exists
with pytest.raises(grpc._channel._InactiveRpcError) as e:
assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
assert e.value.details() == 'Device(DEV1) already exists in the database.'
def test_delete_device_empty_uuid(device_client : DeviceClient):
# should fail with device uuid is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device_id = copy.deepcopy(DEVICE_ID)
copy_device_id['device_id']['uuid'] = ''
device_client.DeleteDevice(DeviceId(**copy_device_id))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'device_id.device_id.uuid() string is empty.'
def test_delete_device_not_found(device_client : DeviceClient):
# should fail with device not found
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device_id = copy.deepcopy(DEVICE_ID)
copy_device_id['device_id']['uuid'] = 'wrong-device-id'
device_client.DeleteDevice(DeviceId(**copy_device_id))
assert e.value.details() == 'Device(wrong-device-id) does not exist in the database.'
def test_delete_device(device_client : DeviceClient):
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_configure_device_empty_device_uuid(device_client : DeviceClient):
# should fail with device uuid is empty
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['device_id']['device_id']['uuid'] = ''
device_client.ConfigureDevice(Device(**copy_device))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'device.device_id.device_id.uuid() string is empty.'
def test_configure_device_not_found(device_client : DeviceClient):
# should fail with device not found
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['device_id']['device_id']['uuid'] = 'wrong-device-id'
device_client.ConfigureDevice(Device(**copy_device))
assert e.value.details() == 'Device(wrong-device-id) does not exist in the database.'
def test_add_device_default_endpoint_context_topology_device(device_client : DeviceClient):
# should work
copy_device = copy.deepcopy(DEVICE)
copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = ''
copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = ''
copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = ''
validate_device_id(MessageToDict(
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_configure_device_wrong_device_type(device_client : DeviceClient):
# should fail with device type is wrong
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['device_type'] = 'wrong-type'
device_client.ConfigureDevice(Device(**copy_device))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'Device(DEV1) has Type(ROADM) in the database. Cannot be changed to Type(wrong-type).'
def test_configure_device_with_endpoints(device_client : DeviceClient):
# should fail with endpoints cannot be modified
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
device_client.ConfigureDevice(Device(**copy_device))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
assert e.value.details() == 'Endpoints belonging to Device(DEV1) cannot be modified.'
def test_configure_device_no_change(device_client : DeviceClient):
# should fail with any change detected
with pytest.raises(grpc._channel._InactiveRpcError) as e:
copy_device = copy.deepcopy(DEVICE)
copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
copy_device['endpointList'].clear()
device_client.ConfigureDevice(Device(**copy_device))
assert e.value.code() == grpc.StatusCode.ABORTED
msg = ' '.join([
'Any change has been requested for Device(DEV1).',
'Either specify a new configuration or a new device operational status.',

Lluis Gifre Renom
committed
def test_configure_device(device_client : DeviceClient):
# should work
copy_device = copy.deepcopy(DEVICE)
copy_device['device_config']['device_config'] = '<new_config/>'
copy_device['devOperationalStatus'] = OperationalStatus.DISABLED.value
copy_device['endpointList'].clear()
validate_device_id(MessageToDict(
device_client.ConfigureDevice(Device(**copy_device)),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))