Skip to content
Snippets Groups Projects

Resolve "Fix NBI tests by adding link_type field"

Merged Lluis Gifre Renom requested to merge feat/217-fix-nbi-tests-by-adding-link_type-field into develop
1 file
+ 59
4
Compare changes
  • Side-by-side
  • Inline
@@ -48,6 +48,8 @@ def validate_device_driver_enum(message):
'DEVICEDRIVER_GNMI_OPENCONFIG',
'DEVICEDRIVER_OPTICAL_TFS',
'DEVICEDRIVER_IETF_ACTN',
'DEVICEDRIVER_OC',
'DEVICEDRIVER_QKD',
]
def validate_device_operational_status_enum(message):
@@ -58,6 +60,20 @@ def validate_device_operational_status_enum(message):
'DEVICEOPERATIONALSTATUS_ENABLED'
]
def validate_isolation_level_enum(message):
assert isinstance(message, str)
assert message in [
'NO_ISOLATION',
'PHYSICAL_ISOLATION',
'LOGICAL_ISOLATION',
'PROCESS_ISOLATION',
'PHYSICAL_MEMORY_ISOLATION',
'PHYSICAL_NETWORK_ISOLATION',
'VIRTUAL_RESOURCE_ISOLATION',
'NETWORK_FUNCTIONS_ISOLATION',
'SERVICE_ISOLATION',
]
def validate_kpi_sample_types_enum(message):
assert isinstance(message, str)
assert message in [
@@ -70,6 +86,16 @@ def validate_kpi_sample_types_enum(message):
'KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS',
]
def validate_link_type_enum(message):
assert isinstance(message, str)
assert message in [
'LINKTYPE_UNKNOWN',
'LINKTYPE_COPPER',
'LINKTYPE_VIRTUAL_COPPER',
'LINKTYPE_OPTICAL',
'LINKTYPE_VIRTUAL_OPTICAL',
]
def validate_service_type_enum(message):
assert isinstance(message, str)
assert message in [
@@ -79,6 +105,8 @@ def validate_service_type_enum(message):
'SERVICETYPE_TAPI_CONNECTIVITY_SERVICE',
'SERVICETYPE_TE',
'SERVICETYPE_E2E',
'SERVICETYPE_OPTICAL_CONNECTIVITY',
'SERVICETYPE_QKD',
]
def validate_service_state_enum(message):
@@ -148,6 +176,22 @@ def validate_constraint_custom(message):
assert 'constraint_value' in message
assert isinstance(message['constraint_value'], str)
def validate_constraint_schedule(message):
assert isinstance(message, dict)
assert len(message.keys()) == 2
assert 'start_timestamp' in message
assert isinstance(message['start_timestamp'], (int, float))
assert 'duration_days' in message
assert isinstance(message['duration_days'], (int, float))
def validate_constraint_endpoint_priority(message):
assert isinstance(message, dict)
assert len(message.keys()) == 2
assert 'endpoint_id' in message
validate_endpoint_id(message['endpoint_id'])
assert 'priority' in message
assert isinstance(message['priority'], int)
def validate_constraint_sla_capacity(message):
assert isinstance(message, dict)
assert len(message.keys()) == 1
@@ -172,16 +216,25 @@ def validate_constraint_sla_availability(message):
assert isinstance(message['availability'], (int, float))
assert message['availability'] >= 0 and message['availability'] <= 100
def validate_constraint_sla_isolation(message):
assert isinstance(message, dict)
assert len(message.keys()) == 1
assert 'isolation_level' in message
assert isinstance(message['isolation_level'], list)
for isolation_level in message['isolation_level']:
validate_isolation_level_enum(isolation_level)
CONSTRAINT_TYPE_TO_VALIDATOR = {
'custom' : validate_constraint_custom,
#'schedule' : validate_constraint_schedule,
'schedule' : validate_constraint_schedule,
#'endpoint_location' : validate_constraint_endpoint_location,
#'endpoint_priority' : validate_constraint_endpoint_priority,
'endpoint_priority' : validate_constraint_endpoint_priority,
'sla_capacity' : validate_constraint_sla_capacity,
'sla_latency' : validate_constraint_sla_latency,
'sla_availability' : validate_constraint_sla_availability,
#'sla_isolation' : validate_constraint_sla_isolation,
'sla_isolation' : validate_constraint_sla_isolation,
#'exclusions' : validate_constraint_exclusions,
#'qos_profile' : validate_constraint_qos_profile,
}
def validate_constraint(message):
@@ -479,7 +532,7 @@ def validate_device(message):
def validate_link(message):
assert isinstance(message, dict)
assert len(message.keys()) == 4
assert len(message.keys()) == 5
assert 'link_id' in message
validate_link_id(message['link_id'])
assert 'name' in message
@@ -489,6 +542,8 @@ def validate_link(message):
for endpoint_id in message['link_endpoint_ids']: validate_endpoint_id(endpoint_id)
assert 'attributes' in message
validate_link_attributes(message['attributes'])
assert 'link_type' in message
validate_link_type_enum(message['link_type'])
def validate_connection(message):
assert isinstance(message, dict)
Loading