Skip to content
Snippets Groups Projects

Resolve: "(CTTC) CAMARA Demo Integration tests"

Merged Lluis Gifre Renom requested to merge camara-demo-integration into develop
3 files
+ 368
294
Compare changes
  • Side-by-side
  • Inline
Files
3
  • 8aeb35fa
    Merge branch... · 8aeb35fa
    Shayan Hajipour authored
    Merge branch 'feat/215-cttc-implement-l3-vpn-sbi-driver-compatible-with-ietf-l3-service-model' into camara-demo-integration
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,9 +30,117 @@ class LANPrefixesDict(TypedDict):
SITE_NETWORK_ACCESS_TYPE = "ietf-l3vpn-svc:multipoint"
def create_site_dict(
site_id: str,
site_location: str,
device_uuid: str,
endpoint_uuid: str,
service_uuid: str,
role: str,
management_type: str,
ce_address: str,
pe_address: str,
ce_pe_network_prefix: int,
mtu: int,
input_bw: int,
output_bw: int,
qos_profile_id: str,
qos_profile_direction: str,
qos_profile_latency: int,
qos_profile_bw_guarantee: int,
lan_prefixes: List[LANPrefixesDict],
) -> Dict:
"""
Helper function that creates a dictionary representing a single 'site'
entry (including management, locations, devices, routing-protocols, and
site-network-accesses).
"""
site_lan_prefixes = [
{
"lan": lp["lan"],
"lan-tag": lp["lan_tag"],
"next-hop": ce_address,
}
for lp in lan_prefixes
]
return {
"site-id": site_id,
"management": {"type": management_type},
"locations": {"location": [{"location-id": site_location}]},
"devices": {
"device": [
{
"device-id": device_uuid,
"location": site_location,
}
]
},
"routing-protocols": {
"routing-protocol": [
{
"type": "ietf-l3vpn-svc:static",
"static": {
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": site_lan_prefixes
}
},
}
]
},
"site-network-accesses": {
"site-network-access": [
{
"site-network-access-id": endpoint_uuid,
"site-network-access-type": SITE_NETWORK_ACCESS_TYPE,
"device-reference": device_uuid,
"vpn-attachment": {
"vpn-id": service_uuid,
"site-role": role,
},
"ip-connection": {
"ipv4": {
"address-allocation-type": "ietf-l3vpn-svc:static-address",
"addresses": {
"provider-address": pe_address,
"customer-address": ce_address,
"prefix-length": ce_pe_network_prefix,
},
}
},
"service": {
"svc-mtu": mtu,
"svc-input-bandwidth": input_bw,
"svc-output-bandwidth": output_bw,
"qos": {
"qos-profile": {
"classes": {
"class": [
{
"class-id": qos_profile_id,
"direction": qos_profile_direction,
"latency": {
"latency-boundary": qos_profile_latency
},
"bandwidth": {
"guaranteed-bw-percent": qos_profile_bw_guarantee
},
}
]
}
}
},
},
}
]
},
}
def setup_config_rules(
service_uuid: str, json_settings: Dict, operation_type: str
) -> List[Dict]:
# --- Extract common or required fields for the source site ---
src_device_uuid: str = json_settings["src_device_name"]
src_endpoint_uuid: str = json_settings["src_endpoint_name"]
src_site_location: str = json_settings["src_site_location"]
@@ -45,6 +153,7 @@ def setup_config_rules(
)
if src_management_type != "ietf-l3vpn-svc:provider-managed":
raise Exception("management type %s not supported", src_management_type)
src_role: str = "ietf-l3vpn-svc:hub-role"
src_ce_address: str = json_settings["src_ce_address"]
src_pe_address: str = json_settings["src_pe_address"]
@@ -58,6 +167,8 @@ def setup_config_rules(
src_qos_profile_bw_guarantee: int = json_settings.get(
"src_qos_profile_bw_guarantee", 100
)
# --- Extract common or required fields for the destination site ---
dst_device_uuid = json_settings["dst_device_name"]
dst_endpoint_uuid = json_settings["dst_endpoint_name"]
dst_site_location: str = json_settings["dst_site_location"]
@@ -70,6 +181,7 @@ def setup_config_rules(
)
if dst_management_type != "ietf-l3vpn-svc:provider-managed":
raise Exception("management type %s not supported", dst_management_type)
dst_role: str = "ietf-l3vpn-svc:spoke-role"
dst_ce_address: str = json_settings["dst_ce_address"]
dst_pe_address: str = json_settings["dst_pe_address"]
@@ -84,156 +196,54 @@ def setup_config_rules(
"dst_qos_profile_bw_guarantee", 100
)
# Create source site information
src_management = {"type": src_management_type}
src_locations = {"location": [{"location-id": src_site_location}]}
src_devices = {
"device": [{"device-id": src_device_uuid, "location": src_site_location}]
}
src_site_lan_prefixes = [
{"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": src_ce_address}
for lp in src_ipv4_lan_prefixes
]
src_site_routing_protocols = {
"routing-protocol": [
{
"type": "ietf-l3vpn-svc:static",
"static": {
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": src_site_lan_prefixes
}
},
}
]
}
src_site_network_accesses = {
"site-network-access": [
{
"site-network-access-id": src_endpoint_uuid,
"site-network-access-type": SITE_NETWORK_ACCESS_TYPE,
"device-reference": src_device_uuid,
"vpn-attachment": {"vpn-id": service_uuid, "site-role": src_role},
"ip-connection": {
"ipv4": {
"address-allocation-type": "ietf-l3vpn-svc:static-address",
"addresses": {
"provider-address": src_pe_address,
"customer-address": src_ce_address,
"prefix-length": src_ce_pe_network_prefix,
},
}
},
"service": {
"svc-mtu": src_mtu,
"svc-input-bandwidth": src_input_bw,
"svc-output-bandwidth": src_output_bw,
"qos": {
"qos-profile": {
"classes": {
"class": [
{
"class-id": src_qos_profile_id,
"direction": src_qos_profile_direction,
"latency": {
"latency-boundary": src_qos_profile_latency
},
"bandwidth": {
"guaranteed-bw-percent": src_qos_profile_bw_guarantee
},
}
]
}
}
},
},
}
]
}
# --- Build site dictionaries using the helper function ---
src_site_dict = create_site_dict(
site_id=src_site_id,
site_location=src_site_location,
device_uuid=src_device_uuid,
endpoint_uuid=src_endpoint_uuid,
service_uuid=service_uuid,
role=src_role,
management_type=src_management_type,
ce_address=src_ce_address,
pe_address=src_pe_address,
ce_pe_network_prefix=src_ce_pe_network_prefix,
mtu=src_mtu,
input_bw=src_input_bw,
output_bw=src_output_bw,
qos_profile_id=src_qos_profile_id,
qos_profile_direction=src_qos_profile_direction,
qos_profile_latency=src_qos_profile_latency,
qos_profile_bw_guarantee=src_qos_profile_bw_guarantee,
lan_prefixes=src_ipv4_lan_prefixes,
)
# Create destination site information
dst_management = {"type": src_management_type}
dst_locations = {"location": [{"location-id": dst_site_location}]}
dst_devices = {
"device": [{"device-id": dst_device_uuid, "location": dst_site_location}]
}
dst_site_lan_prefixes = [
{"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": dst_ce_address}
for lp in dst_ipv4_lan_prefixes
]
dst_site_routing_protocols = {
"routing-protocol": [
{
"type": "ietf-l3vpn-svc:static",
"static": {
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": dst_site_lan_prefixes
}
},
}
]
}
dst_site_network_accesses = {
"site-network-access": [
{
"site-network-access-id": dst_endpoint_uuid,
"site-network-access-type": SITE_NETWORK_ACCESS_TYPE,
"device-reference": dst_device_uuid,
"vpn-attachment": {"vpn-id": service_uuid, "site-role": dst_role},
"ip-connection": {
"ipv4": {
"address-allocation-type": "ietf-l3vpn-svc:static-address",
"addresses": {
"provider-address": dst_pe_address,
"customer-address": dst_ce_address,
"prefix-length": dst_ce_pe_network_prefix,
},
}
},
"service": {
"svc-mtu": dst_mtu,
"svc-input-bandwidth": dst_input_bw,
"svc-output-bandwidth": dst_output_bw,
"qos": {
"qos-profile": {
"classes": {
"class": [
{
"class-id": dst_qos_profile_id,
"direction": dst_qos_profile_direction,
"latency": {
"latency-boundary": dst_qos_profile_latency
},
"bandwidth": {
"guaranteed-bw-percent": dst_qos_profile_bw_guarantee
},
}
]
}
}
},
},
}
]
}
dst_site_dict = create_site_dict(
site_id=dst_site_id,
site_location=dst_site_location,
device_uuid=dst_device_uuid,
endpoint_uuid=dst_endpoint_uuid,
service_uuid=service_uuid,
role=dst_role,
management_type=dst_management_type,
ce_address=dst_ce_address,
pe_address=dst_pe_address,
ce_pe_network_prefix=dst_ce_pe_network_prefix,
mtu=dst_mtu,
input_bw=dst_input_bw,
output_bw=dst_output_bw,
qos_profile_id=dst_qos_profile_id,
qos_profile_direction=dst_qos_profile_direction,
qos_profile_latency=dst_qos_profile_latency,
qos_profile_bw_guarantee=dst_qos_profile_bw_guarantee,
lan_prefixes=dst_ipv4_lan_prefixes,
)
# --- Combine both sites into one structure ---
sites = {
"site": [
{
"site-id": src_site_id,
"management": src_management,
"locations": src_locations,
"devices": src_devices,
"routing-protocols": src_site_routing_protocols,
"site-network-accesses": src_site_network_accesses,
},
{
"site-id": dst_site_id,
"management": dst_management,
"locations": dst_locations,
"devices": dst_devices,
"routing-protocols": dst_site_routing_protocols,
"site-network-accesses": dst_site_network_accesses,
},
src_site_dict,
dst_site_dict,
]
}
@@ -243,6 +253,7 @@ def setup_config_rules(
"sites": sites,
}
}
json_config_rules = [
json_config_rule_set(
"/service[{:s}]/IETFL3VPN".format(service_uuid),
@@ -253,6 +264,7 @@ def setup_config_rules(
{"type": operation_type},
),
]
return json_config_rules
Loading