Loading src/common/type_checkers/Assertions.py +59 −4 Original line number Diff line number Diff line Loading @@ -48,6 +48,8 @@ def validate_device_driver_enum(message): 'DEVICEDRIVER_GNMI_OPENCONFIG', 'DEVICEDRIVER_OPTICAL_TFS', 'DEVICEDRIVER_IETF_ACTN', 'DEVICEDRIVER_OC', 'DEVICEDRIVER_QKD', ] def validate_device_operational_status_enum(message): Loading @@ -58,6 +60,20 @@ def validate_device_operational_status_enum(message): 'DEVICEOPERATIONALSTATUS_ENABLED' ] def validate_isolation_level_enum(message): assert isinstance(message, str) assert message in [ 'NO_ISOLATION', 'PHYSICAL_ISOLATION', 'LOGICAL_ISOLATION', 'PROCESS_ISOLATION', 'PHYSICAL_MEMORY_ISOLATION', 'PHYSICAL_NETWORK_ISOLATION', 'VIRTUAL_RESOURCE_ISOLATION', 'NETWORK_FUNCTIONS_ISOLATION', 'SERVICE_ISOLATION', ] def validate_kpi_sample_types_enum(message): assert isinstance(message, str) assert message in [ Loading @@ -70,6 +86,16 @@ def validate_kpi_sample_types_enum(message): 'KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS', ] def validate_link_type_enum(message): assert isinstance(message, str) assert message in [ 'LINKTYPE_UNKNOWN', 'LINKTYPE_COPPER', 'LINKTYPE_VIRTUAL_COPPER', 'LINKTYPE_OPTICAL', 'LINKTYPE_VIRTUAL_OPTICAL', ] def validate_service_type_enum(message): assert isinstance(message, str) assert message in [ Loading @@ -79,6 +105,8 @@ def validate_service_type_enum(message): 'SERVICETYPE_TAPI_CONNECTIVITY_SERVICE', 'SERVICETYPE_TE', 'SERVICETYPE_E2E', 'SERVICETYPE_OPTICAL_CONNECTIVITY', 'SERVICETYPE_QKD', ] def validate_service_state_enum(message): Loading Loading @@ -148,6 +176,22 @@ def validate_constraint_custom(message): assert 'constraint_value' in message assert isinstance(message['constraint_value'], str) def validate_constraint_schedule(message): assert isinstance(message, dict) assert len(message.keys()) == 2 assert 'start_timestamp' in message assert isinstance(message['start_timestamp'], (int, float)) assert 'duration_days' in message assert isinstance(message['duration_days'], (int, float)) def validate_constraint_endpoint_priority(message): assert isinstance(message, dict) assert len(message.keys()) == 2 assert 'endpoint_id' in message validate_endpoint_id(message['endpoint_id']) assert 'priority' in message assert isinstance(message['priority'], int) def validate_constraint_sla_capacity(message): assert isinstance(message, dict) assert len(message.keys()) == 1 Loading @@ -172,16 +216,25 @@ def validate_constraint_sla_availability(message): assert isinstance(message['availability'], (int, float)) assert message['availability'] >= 0 and message['availability'] <= 100 def validate_constraint_sla_isolation(message): assert isinstance(message, dict) assert len(message.keys()) == 1 assert 'isolation_level' in message assert isinstance(message['isolation_level'], list) for isolation_level in message['isolation_level']: validate_isolation_level_enum(isolation_level) CONSTRAINT_TYPE_TO_VALIDATOR = { 'custom' : validate_constraint_custom, #'schedule' : validate_constraint_schedule, 'schedule' : validate_constraint_schedule, #'endpoint_location' : validate_constraint_endpoint_location, #'endpoint_priority' : validate_constraint_endpoint_priority, 'endpoint_priority' : validate_constraint_endpoint_priority, 'sla_capacity' : validate_constraint_sla_capacity, 'sla_latency' : validate_constraint_sla_latency, 'sla_availability' : validate_constraint_sla_availability, #'sla_isolation' : validate_constraint_sla_isolation, 'sla_isolation' : validate_constraint_sla_isolation, #'exclusions' : validate_constraint_exclusions, #'qos_profile' : validate_constraint_qos_profile, } def validate_constraint(message): Loading Loading @@ -479,7 +532,7 @@ def validate_device(message): def validate_link(message): assert isinstance(message, dict) assert len(message.keys()) == 4 assert len(message.keys()) == 5 assert 'link_id' in message validate_link_id(message['link_id']) assert 'name' in message Loading @@ -489,6 +542,8 @@ def validate_link(message): for endpoint_id in message['link_endpoint_ids']: validate_endpoint_id(endpoint_id) assert 'attributes' in message validate_link_attributes(message['attributes']) assert 'link_type' in message validate_link_type_enum(message['link_type']) def validate_connection(message): assert isinstance(message, dict) Loading Loading
src/common/type_checkers/Assertions.py +59 −4 Original line number Diff line number Diff line Loading @@ -48,6 +48,8 @@ def validate_device_driver_enum(message): 'DEVICEDRIVER_GNMI_OPENCONFIG', 'DEVICEDRIVER_OPTICAL_TFS', 'DEVICEDRIVER_IETF_ACTN', 'DEVICEDRIVER_OC', 'DEVICEDRIVER_QKD', ] def validate_device_operational_status_enum(message): Loading @@ -58,6 +60,20 @@ def validate_device_operational_status_enum(message): 'DEVICEOPERATIONALSTATUS_ENABLED' ] def validate_isolation_level_enum(message): assert isinstance(message, str) assert message in [ 'NO_ISOLATION', 'PHYSICAL_ISOLATION', 'LOGICAL_ISOLATION', 'PROCESS_ISOLATION', 'PHYSICAL_MEMORY_ISOLATION', 'PHYSICAL_NETWORK_ISOLATION', 'VIRTUAL_RESOURCE_ISOLATION', 'NETWORK_FUNCTIONS_ISOLATION', 'SERVICE_ISOLATION', ] def validate_kpi_sample_types_enum(message): assert isinstance(message, str) assert message in [ Loading @@ -70,6 +86,16 @@ def validate_kpi_sample_types_enum(message): 'KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS', ] def validate_link_type_enum(message): assert isinstance(message, str) assert message in [ 'LINKTYPE_UNKNOWN', 'LINKTYPE_COPPER', 'LINKTYPE_VIRTUAL_COPPER', 'LINKTYPE_OPTICAL', 'LINKTYPE_VIRTUAL_OPTICAL', ] def validate_service_type_enum(message): assert isinstance(message, str) assert message in [ Loading @@ -79,6 +105,8 @@ def validate_service_type_enum(message): 'SERVICETYPE_TAPI_CONNECTIVITY_SERVICE', 'SERVICETYPE_TE', 'SERVICETYPE_E2E', 'SERVICETYPE_OPTICAL_CONNECTIVITY', 'SERVICETYPE_QKD', ] def validate_service_state_enum(message): Loading Loading @@ -148,6 +176,22 @@ def validate_constraint_custom(message): assert 'constraint_value' in message assert isinstance(message['constraint_value'], str) def validate_constraint_schedule(message): assert isinstance(message, dict) assert len(message.keys()) == 2 assert 'start_timestamp' in message assert isinstance(message['start_timestamp'], (int, float)) assert 'duration_days' in message assert isinstance(message['duration_days'], (int, float)) def validate_constraint_endpoint_priority(message): assert isinstance(message, dict) assert len(message.keys()) == 2 assert 'endpoint_id' in message validate_endpoint_id(message['endpoint_id']) assert 'priority' in message assert isinstance(message['priority'], int) def validate_constraint_sla_capacity(message): assert isinstance(message, dict) assert len(message.keys()) == 1 Loading @@ -172,16 +216,25 @@ def validate_constraint_sla_availability(message): assert isinstance(message['availability'], (int, float)) assert message['availability'] >= 0 and message['availability'] <= 100 def validate_constraint_sla_isolation(message): assert isinstance(message, dict) assert len(message.keys()) == 1 assert 'isolation_level' in message assert isinstance(message['isolation_level'], list) for isolation_level in message['isolation_level']: validate_isolation_level_enum(isolation_level) CONSTRAINT_TYPE_TO_VALIDATOR = { 'custom' : validate_constraint_custom, #'schedule' : validate_constraint_schedule, 'schedule' : validate_constraint_schedule, #'endpoint_location' : validate_constraint_endpoint_location, #'endpoint_priority' : validate_constraint_endpoint_priority, 'endpoint_priority' : validate_constraint_endpoint_priority, 'sla_capacity' : validate_constraint_sla_capacity, 'sla_latency' : validate_constraint_sla_latency, 'sla_availability' : validate_constraint_sla_availability, #'sla_isolation' : validate_constraint_sla_isolation, 'sla_isolation' : validate_constraint_sla_isolation, #'exclusions' : validate_constraint_exclusions, #'qos_profile' : validate_constraint_qos_profile, } def validate_constraint(message): Loading Loading @@ -479,7 +532,7 @@ def validate_device(message): def validate_link(message): assert isinstance(message, dict) assert len(message.keys()) == 4 assert len(message.keys()) == 5 assert 'link_id' in message validate_link_id(message['link_id']) assert 'name' in message Loading @@ -489,6 +542,8 @@ def validate_link(message): for endpoint_id in message['link_endpoint_ids']: validate_endpoint_id(endpoint_id) assert 'attributes' in message validate_link_attributes(message['attributes']) assert 'link_type' in message validate_link_type_enum(message['link_type']) def validate_connection(message): assert isinstance(message, dict) Loading