Loading src/tests/spectrum_negotiation/README.md +64 −0 Original line number Diff line number Diff line Loading @@ -39,3 +39,67 @@ The script validates: Expected rejection status for overlapping or occupied-slot requests is HTTP `409 Conflict`, mapped from gRPC `ALREADY_EXISTS`. ## Live Candidate Validation Run against a local TFS deployment: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py ``` Run against a specific testbed node: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ --base-url http://172.16.0.101/tfs-api ``` Run against a specific optical pair: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ --base-url http://172.16.0.101/tfs-api \ --context admin \ --topology admin \ --src-device DC1-TP1 \ --dst-device DOMAIN-B \ --channel-width-ghz 50 ``` The script validates: - TFS-API context and topology discovery. - Optical link inventory discovery. - Descriptor device-name to TFS UUID resolution through topology details. - Candidate computation through the NBI facade `/tfs-api/context/<context_uuid>/topology/<topology_uuid>/optical_connectivity_candidates`. - Candidate feasibility for a directed optical endpoint pair. - Reservation of the returned candidate slot range on all selected optical links. - Rejection of the same range when requested as a preferred range. - Re-computation that selects a second non-overlapping candidate after the first range is reserved. - Reservation of the second candidate range on all selected optical links. - Rejection of both reserved ranges when each is requested as a preferred range. - Re-computation that selects a third candidate avoiding both active reservations. - Release of all temporary reservations. - Final cleanup verification that no active `codex-live-candidate-test` reservation remains. This script is intended as a future CI seed once the CI deployment includes the optical controller component and a loaded optical topology. An optional pytest wrapper is available and is skipped unless `TFS_API_BASE_URL` is defined: ```bash TFS_API_BASE_URL=http://172.16.0.101/tfs-api \ TFS_CONTEXT_UUID=admin \ TFS_TOPOLOGY_UUID=admin \ TFS_OPTICAL_SRC_DEVICE=DC1-TP1 \ TFS_OPTICAL_DST_DEVICE=DOMAIN-B \ TFS_OPTICAL_CHANNEL_WIDTH_GHZ=50 \ pytest -q src/tests/spectrum_negotiation/test_live_candidate_validation.py ``` src/tests/spectrum_negotiation/live_candidate_validation.py 0 → 100644 +392 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import json import time import urllib.error import urllib.parse import urllib.request from collections import deque DEFAULT_BASE_URL = 'http://127.0.0.1/tfs-api' OWNER_ID = 'codex-live-candidate-test' def http_request(base_url, method, path, payload=None, timeout=30): data = None headers = {} if payload is not None: data = json.dumps(payload).encode('utf-8') headers['Content-Type'] = 'application/json' request = urllib.request.Request(base_url + path, data=data, headers=headers, method=method) try: with urllib.request.urlopen(request, timeout=timeout) as response: body = response.read().decode('utf-8') return response.status, json.loads(body) if body else None except urllib.error.HTTPError as exc: body = exc.read().decode('utf-8') try: payload = json.loads(body) if body else None except json.JSONDecodeError: payload = body return exc.code, payload def first_context_and_topology(base_url, context_uuid=None, topology_uuid=None): if context_uuid is not None and topology_uuid is not None: return context_uuid, topology_uuid status, contexts = http_request(base_url, 'GET', '/contexts') assert status == 200, (status, contexts) assert contexts.get('contexts'), contexts for context in contexts['contexts']: candidate_context_uuid = context['context_id']['context_uuid']['uuid'] if context_uuid is not None and candidate_context_uuid != context_uuid: continue for topology_id in context.get('topology_ids', []): candidate_topology_uuid = topology_id['topology_uuid']['uuid'] if topology_uuid is not None and candidate_topology_uuid != topology_uuid: continue return candidate_context_uuid, candidate_topology_uuid raise AssertionError('Requested context/topology not found') def optical_link_uuid(link): return link['link_id']['link_uuid']['uuid'] def endpoint_device_uuid(endpoint_id): return endpoint_id['device_id']['device_uuid']['uuid'] def endpoint_uuid(endpoint_id): return endpoint_id['endpoint_uuid']['uuid'] def list_optical_links(base_url): status, optical_links = http_request(base_url, 'GET', '/optical_links') assert status == 200, (status, optical_links) links = optical_links.get('optical_links', []) assert links, 'No optical links exposed by TFS-API' return links def topology_details(base_url, context_uuid, topology_uuid): path = '/context/{:s}/topology_details/{:s}'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) ) status, details = http_request(base_url, 'GET', path) assert status == 200, (status, details) return details def device_uuid(device): return device['device_id']['device_uuid']['uuid'] def device_name(device): return device.get('name') or device_uuid(device) def endpoint_name(endpoint): return endpoint.get('name') or endpoint['endpoint_id']['endpoint_uuid']['uuid'] def resolve_device_uuid(devices, device_name_or_uuid): for device in devices: if device_uuid(device) == device_name_or_uuid or device_name(device) == device_name_or_uuid: return device_uuid(device) raise AssertionError('Optical device not found: {:s}'.format(device_name_or_uuid)) def resolve_endpoint_uuid(devices, resolved_device_uuid, preferred_endpoint_name): for device in devices: if device_uuid(device) != resolved_device_uuid: continue endpoints = device.get('device_endpoints', []) for endpoint in endpoints: endpoint_uuid_value = endpoint['endpoint_id']['endpoint_uuid']['uuid'] if endpoint_uuid_value == preferred_endpoint_name or endpoint_name(endpoint) == preferred_endpoint_name: return endpoint_uuid_value if endpoints: return endpoints[0]['endpoint_id']['endpoint_uuid']['uuid'] return preferred_endpoint_name def endpoint_id(device_uuid, endpoint_uuid): return { 'device_id': {'device_uuid': {'uuid': device_uuid}}, 'endpoint_uuid': {'uuid': endpoint_uuid}, } def build_graph(optical_links): adjacency = {} devices = set() for link in optical_links: endpoints = link.get('link_endpoint_ids', []) if len(endpoints) < 2: continue src_device = endpoint_device_uuid(endpoints[0]) dst_device = endpoint_device_uuid(endpoints[-1]) devices.add(src_device) devices.add(dst_device) adjacency.setdefault(src_device, []).append(dst_device) return devices, adjacency def path_exists(adjacency, src_device_uuid, dst_device_uuid): queue = deque([src_device_uuid]) seen = {src_device_uuid} while queue: device_uuid = queue.popleft() if device_uuid == dst_device_uuid: return True for next_device_uuid in adjacency.get(device_uuid, []): if next_device_uuid in seen: continue seen.add(next_device_uuid) queue.append(next_device_uuid) return False def choose_endpoint_pair(optical_links, topology_devices, src_device=None, dst_device=None): named_devices = {device_name(device): device_uuid(device) for device in topology_devices} graph_devices, adjacency = build_graph(optical_links) if src_device is not None and dst_device is not None: src_device_uuid = resolve_device_uuid(topology_devices, src_device) dst_device_uuid = resolve_device_uuid(topology_devices, dst_device) assert src_device_uuid in graph_devices, 'Source optical device not found in optical graph: {:s}'.format( src_device ) assert dst_device_uuid in graph_devices, 'Destination optical device not found in optical graph: {:s}'.format( dst_device ) assert path_exists(adjacency, src_device_uuid, dst_device_uuid), ( 'No optical path found between {:s} and {:s}'.format(src_device, dst_device) ) src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, 'CHANNEL') dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, 'CHANNEL') return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) preferred_pairs = [ ('DC1-TP1', 'DOMAIN-B', 'CHANNEL', 'port-in'), ('DC1-TP1', 'DC2-TP1', 'CHANNEL', 'CHANNEL'), ('DC1-TP1', 'DC3-TP1', 'CHANNEL', 'CHANNEL'), ('DC1-TP1', 'DC4-TP1', 'CHANNEL', 'CHANNEL'), ] for src_device, dst_device, src_endpoint, dst_endpoint in preferred_pairs: if src_device not in named_devices or dst_device not in named_devices: continue src_device_uuid = named_devices[src_device] dst_device_uuid = named_devices[dst_device] if src_device_uuid in graph_devices and dst_device_uuid in graph_devices and path_exists( adjacency, src_device_uuid, dst_device_uuid ): src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, src_endpoint) dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, dst_endpoint) return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) for src_device in sorted(graph_devices): for dst_device in sorted(graph_devices): if src_device != dst_device and path_exists(adjacency, src_device, dst_device): return endpoint_id(src_device, 'CHANNEL'), endpoint_id(dst_device, 'CHANNEL') raise AssertionError('No optical endpoint pair with a directed path was found') def candidate_request(src_endpoint_id, dst_endpoint_id, channel_width_ghz, preferred_n_start=None, preferred_n_end=None): request = { 'src_endpoint_id': src_endpoint_id, 'dst_endpoint_id': dst_endpoint_id, 'channel_width_ghz': channel_width_ghz, 'preferred_band': 'c_slots', 'max_candidates': 3, } if preferred_n_start is not None: request['preferred_n_start'] = preferred_n_start if preferred_n_end is not None: request['preferred_n_end'] = preferred_n_end return request def reservation_payload(context_uuid, topology_uuid, reservation_uuid, optical_link_ids, band, n_start, n_end): return { 'reservation_id': { 'context_id': {'context_uuid': {'uuid': context_uuid}}, 'reservation_uuid': {'uuid': reservation_uuid}, }, 'topology_id': { 'context_id': {'context_uuid': {'uuid': context_uuid}}, 'topology_uuid': {'uuid': topology_uuid}, }, 'optical_link_ids': [{'link_uuid': {'uuid': link_uuid}} for link_uuid in optical_link_ids], 'band': band, 'n_start': n_start, 'n_end': n_end, 'required_slots': n_end - n_start + 1, 'owner_id': OWNER_ID, 'correlation_id': reservation_uuid, } def release_reservation(base_url, context_uuid, reservation_uuid): release_path = '/context/{:s}/optical_spectrum_reservation/{:s}/release'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(reservation_uuid) ) status, reply = http_request(base_url, 'POST', release_path) assert status == 200, (status, reply) def active_test_reservations(reply): reservations = reply.get('reservations', []) return [ reservation for reservation in reservations if reservation.get('owner_id') == OWNER_ID and reservation.get('status') not in ('OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED', 'RELEASED') ] def selected_candidate(reply): assert reply.get('candidates'), reply candidate = reply['candidates'][0] assert candidate.get('validation_status') == 'VALID', candidate assert candidate.get('optical_link_ids'), candidate assert candidate['n_end'] >= candidate['n_start'], candidate return candidate def ranges_overlap(left, right): if left['band'] != right['band']: return False return not (left['n_end'] < right['n_start'] or left['n_start'] > right['n_end']) def assert_non_overlapping(candidate, blocked_candidates): for blocked_candidate in blocked_candidates: assert not ranges_overlap(candidate, blocked_candidate), (candidate, blocked_candidate) def assert_preferred_range_rejected(base_url, candidate_path, src_endpoint, dst_endpoint, channel_width_ghz, candidate): blocked_request = candidate_request( src_endpoint, dst_endpoint, channel_width_ghz, preferred_n_start=candidate['n_start'], preferred_n_end=candidate['n_end'] ) status, blocked_reply = http_request(base_url, 'POST', candidate_path, blocked_request) assert status == 409, (status, blocked_reply) assert blocked_reply.get('rejected_reasons'), blocked_reply def create_candidate_reservation(base_url, context_uuid, topology_uuid, list_path, reservation_uuid, candidate): reservation = reservation_payload( context_uuid, topology_uuid, reservation_uuid, candidate['optical_link_ids'], candidate['band'], candidate['n_start'], candidate['n_end'] ) status, create_reply = http_request(base_url, 'POST', list_path, reservation) assert status == 200, (status, create_reply) def main(): parser = argparse.ArgumentParser( description='Validate live optical path spectrum candidate behavior through the TFS-API facade.' ) parser.add_argument('--base-url', default=DEFAULT_BASE_URL, help='TFS-API base URL, default: %(default)s') parser.add_argument('--context', help='Context UUID/name. Auto-discovered by default.') parser.add_argument('--topology', help='Topology UUID/name. Auto-discovered by default.') parser.add_argument('--src-device', help='Source optical device UUID. Auto-selected by default.') parser.add_argument('--dst-device', help='Destination optical device UUID. Auto-selected by default.') parser.add_argument('--channel-width-ghz', type=float, default=50.0, help='Requested optical width.') args = parser.parse_args() context_uuid, topology_uuid = first_context_and_topology(args.base_url, args.context, args.topology) optical_links = list_optical_links(args.base_url) details = topology_details(args.base_url, context_uuid, topology_uuid) src_endpoint, dst_endpoint = choose_endpoint_pair( optical_links, details.get('devices', []), args.src_device, args.dst_device ) candidate_path = '/context/{:s}/topology/{:s}/optical_connectivity_candidates'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) ) request_payload = candidate_request(src_endpoint, dst_endpoint, args.channel_width_ghz) status, initial_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, initial_reply) first_candidate = selected_candidate(initial_reply) reservation_uuid = 'live-candidate-{:d}'.format(int(time.time())) second_reservation_uuid = reservation_uuid + '-second' list_path = '/context/{:s}/optical_spectrum_reservations'.format(urllib.parse.quote(context_uuid)) created_reservations = [] second_candidate = None third_candidate = None try: create_candidate_reservation( args.base_url, context_uuid, topology_uuid, list_path, reservation_uuid, first_candidate ) created_reservations.append(reservation_uuid) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate ) status, next_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, next_reply) second_candidate = selected_candidate(next_reply) assert_non_overlapping(second_candidate, [first_candidate]) create_candidate_reservation( args.base_url, context_uuid, topology_uuid, list_path, second_reservation_uuid, second_candidate ) created_reservations.append(second_reservation_uuid) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate ) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, second_candidate ) status, third_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, third_reply) third_candidate = selected_candidate(third_reply) assert_non_overlapping(third_candidate, [first_candidate, second_candidate]) finally: for created_reservation_uuid in reversed(created_reservations): release_reservation(args.base_url, context_uuid, created_reservation_uuid) status, after = http_request(args.base_url, 'GET', list_path) assert status == 200, (status, after) assert not active_test_reservations(after), after print('LIVE_CANDIDATE_VALIDATION_OK') print('context_uuid={:s}'.format(context_uuid)) print('topology_uuid={:s}'.format(topology_uuid)) print('src={:s}/{:s}'.format(endpoint_device_uuid(src_endpoint), endpoint_uuid(src_endpoint))) print('dst={:s}/{:s}'.format(endpoint_device_uuid(dst_endpoint), endpoint_uuid(dst_endpoint))) print('selected_band={:s}'.format(first_candidate['band'])) print('selected_slots={:d}-{:d}'.format(first_candidate['n_start'], first_candidate['n_end'])) if second_candidate is not None: print('second_slots={:d}-{:d}'.format(second_candidate['n_start'], second_candidate['n_end'])) if third_candidate is not None: print('third_slots={:d}-{:d}'.format(third_candidate['n_start'], third_candidate['n_end'])) print('selected_links={:s}'.format(','.join(first_candidate['optical_link_ids']))) if __name__ == '__main__': main() src/tests/spectrum_negotiation/test_live_candidate_validation.py 0 → 100644 +42 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import subprocess import sys from pathlib import Path import pytest def test_live_candidate_validation(): base_url = os.environ.get('TFS_API_BASE_URL') if base_url is None: pytest.skip('Set TFS_API_BASE_URL to enable live optical candidate validation') script_path = Path(__file__).with_name('live_candidate_validation.py') command = [sys.executable, str(script_path), '--base-url', base_url] for env_name, option_name in ( ('TFS_CONTEXT_UUID', '--context'), ('TFS_TOPOLOGY_UUID', '--topology'), ('TFS_OPTICAL_SRC_DEVICE', '--src-device'), ('TFS_OPTICAL_DST_DEVICE', '--dst-device'), ('TFS_OPTICAL_CHANNEL_WIDTH_GHZ', '--channel-width-ghz'), ): value = os.environ.get(env_name) if value is not None: command.extend([option_name, value]) subprocess.run(command, check=True) Loading
src/tests/spectrum_negotiation/README.md +64 −0 Original line number Diff line number Diff line Loading @@ -39,3 +39,67 @@ The script validates: Expected rejection status for overlapping or occupied-slot requests is HTTP `409 Conflict`, mapped from gRPC `ALREADY_EXISTS`. ## Live Candidate Validation Run against a local TFS deployment: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py ``` Run against a specific testbed node: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ --base-url http://172.16.0.101/tfs-api ``` Run against a specific optical pair: ```bash python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ --base-url http://172.16.0.101/tfs-api \ --context admin \ --topology admin \ --src-device DC1-TP1 \ --dst-device DOMAIN-B \ --channel-width-ghz 50 ``` The script validates: - TFS-API context and topology discovery. - Optical link inventory discovery. - Descriptor device-name to TFS UUID resolution through topology details. - Candidate computation through the NBI facade `/tfs-api/context/<context_uuid>/topology/<topology_uuid>/optical_connectivity_candidates`. - Candidate feasibility for a directed optical endpoint pair. - Reservation of the returned candidate slot range on all selected optical links. - Rejection of the same range when requested as a preferred range. - Re-computation that selects a second non-overlapping candidate after the first range is reserved. - Reservation of the second candidate range on all selected optical links. - Rejection of both reserved ranges when each is requested as a preferred range. - Re-computation that selects a third candidate avoiding both active reservations. - Release of all temporary reservations. - Final cleanup verification that no active `codex-live-candidate-test` reservation remains. This script is intended as a future CI seed once the CI deployment includes the optical controller component and a loaded optical topology. An optional pytest wrapper is available and is skipped unless `TFS_API_BASE_URL` is defined: ```bash TFS_API_BASE_URL=http://172.16.0.101/tfs-api \ TFS_CONTEXT_UUID=admin \ TFS_TOPOLOGY_UUID=admin \ TFS_OPTICAL_SRC_DEVICE=DC1-TP1 \ TFS_OPTICAL_DST_DEVICE=DOMAIN-B \ TFS_OPTICAL_CHANNEL_WIDTH_GHZ=50 \ pytest -q src/tests/spectrum_negotiation/test_live_candidate_validation.py ```
src/tests/spectrum_negotiation/live_candidate_validation.py 0 → 100644 +392 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import json import time import urllib.error import urllib.parse import urllib.request from collections import deque DEFAULT_BASE_URL = 'http://127.0.0.1/tfs-api' OWNER_ID = 'codex-live-candidate-test' def http_request(base_url, method, path, payload=None, timeout=30): data = None headers = {} if payload is not None: data = json.dumps(payload).encode('utf-8') headers['Content-Type'] = 'application/json' request = urllib.request.Request(base_url + path, data=data, headers=headers, method=method) try: with urllib.request.urlopen(request, timeout=timeout) as response: body = response.read().decode('utf-8') return response.status, json.loads(body) if body else None except urllib.error.HTTPError as exc: body = exc.read().decode('utf-8') try: payload = json.loads(body) if body else None except json.JSONDecodeError: payload = body return exc.code, payload def first_context_and_topology(base_url, context_uuid=None, topology_uuid=None): if context_uuid is not None and topology_uuid is not None: return context_uuid, topology_uuid status, contexts = http_request(base_url, 'GET', '/contexts') assert status == 200, (status, contexts) assert contexts.get('contexts'), contexts for context in contexts['contexts']: candidate_context_uuid = context['context_id']['context_uuid']['uuid'] if context_uuid is not None and candidate_context_uuid != context_uuid: continue for topology_id in context.get('topology_ids', []): candidate_topology_uuid = topology_id['topology_uuid']['uuid'] if topology_uuid is not None and candidate_topology_uuid != topology_uuid: continue return candidate_context_uuid, candidate_topology_uuid raise AssertionError('Requested context/topology not found') def optical_link_uuid(link): return link['link_id']['link_uuid']['uuid'] def endpoint_device_uuid(endpoint_id): return endpoint_id['device_id']['device_uuid']['uuid'] def endpoint_uuid(endpoint_id): return endpoint_id['endpoint_uuid']['uuid'] def list_optical_links(base_url): status, optical_links = http_request(base_url, 'GET', '/optical_links') assert status == 200, (status, optical_links) links = optical_links.get('optical_links', []) assert links, 'No optical links exposed by TFS-API' return links def topology_details(base_url, context_uuid, topology_uuid): path = '/context/{:s}/topology_details/{:s}'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) ) status, details = http_request(base_url, 'GET', path) assert status == 200, (status, details) return details def device_uuid(device): return device['device_id']['device_uuid']['uuid'] def device_name(device): return device.get('name') or device_uuid(device) def endpoint_name(endpoint): return endpoint.get('name') or endpoint['endpoint_id']['endpoint_uuid']['uuid'] def resolve_device_uuid(devices, device_name_or_uuid): for device in devices: if device_uuid(device) == device_name_or_uuid or device_name(device) == device_name_or_uuid: return device_uuid(device) raise AssertionError('Optical device not found: {:s}'.format(device_name_or_uuid)) def resolve_endpoint_uuid(devices, resolved_device_uuid, preferred_endpoint_name): for device in devices: if device_uuid(device) != resolved_device_uuid: continue endpoints = device.get('device_endpoints', []) for endpoint in endpoints: endpoint_uuid_value = endpoint['endpoint_id']['endpoint_uuid']['uuid'] if endpoint_uuid_value == preferred_endpoint_name or endpoint_name(endpoint) == preferred_endpoint_name: return endpoint_uuid_value if endpoints: return endpoints[0]['endpoint_id']['endpoint_uuid']['uuid'] return preferred_endpoint_name def endpoint_id(device_uuid, endpoint_uuid): return { 'device_id': {'device_uuid': {'uuid': device_uuid}}, 'endpoint_uuid': {'uuid': endpoint_uuid}, } def build_graph(optical_links): adjacency = {} devices = set() for link in optical_links: endpoints = link.get('link_endpoint_ids', []) if len(endpoints) < 2: continue src_device = endpoint_device_uuid(endpoints[0]) dst_device = endpoint_device_uuid(endpoints[-1]) devices.add(src_device) devices.add(dst_device) adjacency.setdefault(src_device, []).append(dst_device) return devices, adjacency def path_exists(adjacency, src_device_uuid, dst_device_uuid): queue = deque([src_device_uuid]) seen = {src_device_uuid} while queue: device_uuid = queue.popleft() if device_uuid == dst_device_uuid: return True for next_device_uuid in adjacency.get(device_uuid, []): if next_device_uuid in seen: continue seen.add(next_device_uuid) queue.append(next_device_uuid) return False def choose_endpoint_pair(optical_links, topology_devices, src_device=None, dst_device=None): named_devices = {device_name(device): device_uuid(device) for device in topology_devices} graph_devices, adjacency = build_graph(optical_links) if src_device is not None and dst_device is not None: src_device_uuid = resolve_device_uuid(topology_devices, src_device) dst_device_uuid = resolve_device_uuid(topology_devices, dst_device) assert src_device_uuid in graph_devices, 'Source optical device not found in optical graph: {:s}'.format( src_device ) assert dst_device_uuid in graph_devices, 'Destination optical device not found in optical graph: {:s}'.format( dst_device ) assert path_exists(adjacency, src_device_uuid, dst_device_uuid), ( 'No optical path found between {:s} and {:s}'.format(src_device, dst_device) ) src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, 'CHANNEL') dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, 'CHANNEL') return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) preferred_pairs = [ ('DC1-TP1', 'DOMAIN-B', 'CHANNEL', 'port-in'), ('DC1-TP1', 'DC2-TP1', 'CHANNEL', 'CHANNEL'), ('DC1-TP1', 'DC3-TP1', 'CHANNEL', 'CHANNEL'), ('DC1-TP1', 'DC4-TP1', 'CHANNEL', 'CHANNEL'), ] for src_device, dst_device, src_endpoint, dst_endpoint in preferred_pairs: if src_device not in named_devices or dst_device not in named_devices: continue src_device_uuid = named_devices[src_device] dst_device_uuid = named_devices[dst_device] if src_device_uuid in graph_devices and dst_device_uuid in graph_devices and path_exists( adjacency, src_device_uuid, dst_device_uuid ): src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, src_endpoint) dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, dst_endpoint) return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) for src_device in sorted(graph_devices): for dst_device in sorted(graph_devices): if src_device != dst_device and path_exists(adjacency, src_device, dst_device): return endpoint_id(src_device, 'CHANNEL'), endpoint_id(dst_device, 'CHANNEL') raise AssertionError('No optical endpoint pair with a directed path was found') def candidate_request(src_endpoint_id, dst_endpoint_id, channel_width_ghz, preferred_n_start=None, preferred_n_end=None): request = { 'src_endpoint_id': src_endpoint_id, 'dst_endpoint_id': dst_endpoint_id, 'channel_width_ghz': channel_width_ghz, 'preferred_band': 'c_slots', 'max_candidates': 3, } if preferred_n_start is not None: request['preferred_n_start'] = preferred_n_start if preferred_n_end is not None: request['preferred_n_end'] = preferred_n_end return request def reservation_payload(context_uuid, topology_uuid, reservation_uuid, optical_link_ids, band, n_start, n_end): return { 'reservation_id': { 'context_id': {'context_uuid': {'uuid': context_uuid}}, 'reservation_uuid': {'uuid': reservation_uuid}, }, 'topology_id': { 'context_id': {'context_uuid': {'uuid': context_uuid}}, 'topology_uuid': {'uuid': topology_uuid}, }, 'optical_link_ids': [{'link_uuid': {'uuid': link_uuid}} for link_uuid in optical_link_ids], 'band': band, 'n_start': n_start, 'n_end': n_end, 'required_slots': n_end - n_start + 1, 'owner_id': OWNER_ID, 'correlation_id': reservation_uuid, } def release_reservation(base_url, context_uuid, reservation_uuid): release_path = '/context/{:s}/optical_spectrum_reservation/{:s}/release'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(reservation_uuid) ) status, reply = http_request(base_url, 'POST', release_path) assert status == 200, (status, reply) def active_test_reservations(reply): reservations = reply.get('reservations', []) return [ reservation for reservation in reservations if reservation.get('owner_id') == OWNER_ID and reservation.get('status') not in ('OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED', 'RELEASED') ] def selected_candidate(reply): assert reply.get('candidates'), reply candidate = reply['candidates'][0] assert candidate.get('validation_status') == 'VALID', candidate assert candidate.get('optical_link_ids'), candidate assert candidate['n_end'] >= candidate['n_start'], candidate return candidate def ranges_overlap(left, right): if left['band'] != right['band']: return False return not (left['n_end'] < right['n_start'] or left['n_start'] > right['n_end']) def assert_non_overlapping(candidate, blocked_candidates): for blocked_candidate in blocked_candidates: assert not ranges_overlap(candidate, blocked_candidate), (candidate, blocked_candidate) def assert_preferred_range_rejected(base_url, candidate_path, src_endpoint, dst_endpoint, channel_width_ghz, candidate): blocked_request = candidate_request( src_endpoint, dst_endpoint, channel_width_ghz, preferred_n_start=candidate['n_start'], preferred_n_end=candidate['n_end'] ) status, blocked_reply = http_request(base_url, 'POST', candidate_path, blocked_request) assert status == 409, (status, blocked_reply) assert blocked_reply.get('rejected_reasons'), blocked_reply def create_candidate_reservation(base_url, context_uuid, topology_uuid, list_path, reservation_uuid, candidate): reservation = reservation_payload( context_uuid, topology_uuid, reservation_uuid, candidate['optical_link_ids'], candidate['band'], candidate['n_start'], candidate['n_end'] ) status, create_reply = http_request(base_url, 'POST', list_path, reservation) assert status == 200, (status, create_reply) def main(): parser = argparse.ArgumentParser( description='Validate live optical path spectrum candidate behavior through the TFS-API facade.' ) parser.add_argument('--base-url', default=DEFAULT_BASE_URL, help='TFS-API base URL, default: %(default)s') parser.add_argument('--context', help='Context UUID/name. Auto-discovered by default.') parser.add_argument('--topology', help='Topology UUID/name. Auto-discovered by default.') parser.add_argument('--src-device', help='Source optical device UUID. Auto-selected by default.') parser.add_argument('--dst-device', help='Destination optical device UUID. Auto-selected by default.') parser.add_argument('--channel-width-ghz', type=float, default=50.0, help='Requested optical width.') args = parser.parse_args() context_uuid, topology_uuid = first_context_and_topology(args.base_url, args.context, args.topology) optical_links = list_optical_links(args.base_url) details = topology_details(args.base_url, context_uuid, topology_uuid) src_endpoint, dst_endpoint = choose_endpoint_pair( optical_links, details.get('devices', []), args.src_device, args.dst_device ) candidate_path = '/context/{:s}/topology/{:s}/optical_connectivity_candidates'.format( urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) ) request_payload = candidate_request(src_endpoint, dst_endpoint, args.channel_width_ghz) status, initial_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, initial_reply) first_candidate = selected_candidate(initial_reply) reservation_uuid = 'live-candidate-{:d}'.format(int(time.time())) second_reservation_uuid = reservation_uuid + '-second' list_path = '/context/{:s}/optical_spectrum_reservations'.format(urllib.parse.quote(context_uuid)) created_reservations = [] second_candidate = None third_candidate = None try: create_candidate_reservation( args.base_url, context_uuid, topology_uuid, list_path, reservation_uuid, first_candidate ) created_reservations.append(reservation_uuid) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate ) status, next_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, next_reply) second_candidate = selected_candidate(next_reply) assert_non_overlapping(second_candidate, [first_candidate]) create_candidate_reservation( args.base_url, context_uuid, topology_uuid, list_path, second_reservation_uuid, second_candidate ) created_reservations.append(second_reservation_uuid) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate ) assert_preferred_range_rejected( args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, second_candidate ) status, third_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) assert status == 200, (status, third_reply) third_candidate = selected_candidate(third_reply) assert_non_overlapping(third_candidate, [first_candidate, second_candidate]) finally: for created_reservation_uuid in reversed(created_reservations): release_reservation(args.base_url, context_uuid, created_reservation_uuid) status, after = http_request(args.base_url, 'GET', list_path) assert status == 200, (status, after) assert not active_test_reservations(after), after print('LIVE_CANDIDATE_VALIDATION_OK') print('context_uuid={:s}'.format(context_uuid)) print('topology_uuid={:s}'.format(topology_uuid)) print('src={:s}/{:s}'.format(endpoint_device_uuid(src_endpoint), endpoint_uuid(src_endpoint))) print('dst={:s}/{:s}'.format(endpoint_device_uuid(dst_endpoint), endpoint_uuid(dst_endpoint))) print('selected_band={:s}'.format(first_candidate['band'])) print('selected_slots={:d}-{:d}'.format(first_candidate['n_start'], first_candidate['n_end'])) if second_candidate is not None: print('second_slots={:d}-{:d}'.format(second_candidate['n_start'], second_candidate['n_end'])) if third_candidate is not None: print('third_slots={:d}-{:d}'.format(third_candidate['n_start'], third_candidate['n_end'])) print('selected_links={:s}'.format(','.join(first_candidate['optical_link_ids']))) if __name__ == '__main__': main()
src/tests/spectrum_negotiation/test_live_candidate_validation.py 0 → 100644 +42 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import subprocess import sys from pathlib import Path import pytest def test_live_candidate_validation(): base_url = os.environ.get('TFS_API_BASE_URL') if base_url is None: pytest.skip('Set TFS_API_BASE_URL to enable live optical candidate validation') script_path = Path(__file__).with_name('live_candidate_validation.py') command = [sys.executable, str(script_path), '--base-url', base_url] for env_name, option_name in ( ('TFS_CONTEXT_UUID', '--context'), ('TFS_TOPOLOGY_UUID', '--topology'), ('TFS_OPTICAL_SRC_DEVICE', '--src-device'), ('TFS_OPTICAL_DST_DEVICE', '--dst-device'), ('TFS_OPTICAL_CHANNEL_WIDTH_GHZ', '--channel-width-ghz'), ): value = os.environ.get(env_name) if value is not None: command.extend([option_name, value]) subprocess.run(command, check=True)