diff --git a/src/nbi/service/tfs_api/Resources.py b/src/nbi/service/tfs_api/Resources.py index 536daca8dbc4851202149aed88091bd944336b4c..3e183220073c72d883240df9ce8a31413d56efd6 100644 --- a/src/nbi/service/tfs_api/Resources.py +++ b/src/nbi/service/tfs_api/Resources.py @@ -14,12 +14,16 @@ import json import logging +import urllib.parse from typing import Dict, List import grpc +import requests from flask.json import jsonify from flask_restful import Resource, request from werkzeug.exceptions import BadRequest +from common.Constants import ServiceNameEnum from common.proto.context_pb2 import Empty, LinkTypeEnum, Service, ServiceTypeEnum, ConfigActionEnum, ConfigRule +from common.Settings import get_service_host, get_service_port_grpc from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest from common.tools.descriptor.Tools import format_device_custom_config_rules, format_service_custom_config_rules from common.tools.grpc.Tools import grpc_message_to_json @@ -455,6 +459,34 @@ class OpticalSpectrumReservationRelease(_Resource): except grpc.RpcError as exc: return _format_grpc_error(exc) +class OpticalConnectivityCandidates(_Resource): + def post(self, context_uuid : str, topology_uuid : str): + candidate_request = request.get_json() + if candidate_request is None: + raise BadRequest('Missing optical connectivity candidate request') + + try: + optical_host = get_service_host(ServiceNameEnum.OPTICALCONTROLLER) + optical_port = get_service_port_grpc(ServiceNameEnum.OPTICALCONTROLLER) + url = 'http://{:s}:{:d}/OpticalTFS/ComputeOpticalConnectivityCandidates/{:s}/{:s}'.format( + optical_host, optical_port, + urllib.parse.quote(context_uuid, safe=''), + urllib.parse.quote(topology_uuid, safe=''), + ) + reply = requests.post(url, json=candidate_request, timeout=30) + if reply.content and len(reply.content) > 0: + return reply.json(), reply.status_code + return None, reply.status_code + except requests.exceptions.RequestException as exc: + LOGGER.warning('Optical controller candidate computation request failed: %s', str(exc)) + return { + 'candidates': [], + 'rejected_reasons': [{ + 'code': 'OPTICAL_CONTROLLER_UNAVAILABLE', + 'message': str(exc), + }], + }, 503 + class ConnectionIds(_Resource): def get(self, context_uuid : str, service_uuid : str): return format_grpc_to_json(self.context_client.ListConnectionIds(grpc_service_id(context_uuid, service_uuid))) diff --git a/src/nbi/service/tfs_api/__init__.py b/src/nbi/service/tfs_api/__init__.py index 2bcf431ec102e5a83fdea8df85307e97066ebad3..2e666f252cc77e41d1bb331d3eff4412c1c12287 100644 --- a/src/nbi/service/tfs_api/__init__.py +++ b/src/nbi/service/tfs_api/__init__.py @@ -22,6 +22,7 @@ from .Resources import ( OpticalLink, OpticalLinks, OpticalSpectrumReservation, OpticalSpectrumReservationConsume, OpticalSpectrumReservationRelease, OpticalSpectrumReservations, + OpticalConnectivityCandidates, PolicyRule, PolicyRuleIds, PolicyRules, Service, ServiceIds, Services, Slice, SliceIds, Slices, @@ -72,6 +73,8 @@ _RESOURCES = [ '/context//optical_spectrum_reservation//consume'), ('api.optical_spectrum_reservation_release', OpticalSpectrumReservationRelease, '/context//optical_spectrum_reservation//release'), + ('api.optical_connectivity_candidates', OpticalConnectivityCandidates, + '/context//topology//optical_connectivity_candidates'), ('api.connection_ids', ConnectionIds, '/context//service//connection_ids'), ('api.connections', Connections, '/context//service//connections'), diff --git a/src/opticalcontroller/OpticalController.py b/src/opticalcontroller/OpticalController.py index da65ad81cf007dff3840ea33edc130c28e165bbd..fd6b7f5a12ad061b8a41c41b4a65cea6396ad84d 100644 --- a/src/opticalcontroller/OpticalController.py +++ b/src/opticalcontroller/OpticalController.py @@ -17,10 +17,14 @@ from flask import Flask, request from flask import render_template from common.DeviceTypes import DeviceTypeEnum from flask_restplus import Resource, Api +from context.client.ContextClient import ContextClient from opticalcontroller.tools import * from opticalcontroller.variables import * from opticalcontroller.RSA import RSA -from common.proto.context_pb2 import TopologyId , OpticalLink +from opticalcontroller.service.ConnectivityCandidates import ( + OpticalCandidateError, compute_optical_connectivity_candidates +) +from common.proto.context_pb2 import ContextId, TopologyId , OpticalLink import json from google.protobuf.message import Message from google.protobuf.json_format import MessageToDict @@ -512,5 +516,43 @@ class GetTopology(Resource): #print(f'err {e}') return 'Error', 400 +@optical.route('/ComputeOpticalConnectivityCandidates//', methods=['POST']) +@optical.response(200, 'Success') +@optical.response(400, 'Invalid request') +@optical.response(404, 'Error, not found') +@optical.response(409, 'No feasible optical candidate') +class ComputeOpticalConnectivityCandidates(Resource): + @staticmethod + def post(context_id: str, topology_id: str): + candidate_request = request.get_json() + if candidate_request is None: + return { + "candidates": [], + "rejected_reasons": [{"code": "INVALID_REQUEST", "message": "Missing request body"}], + }, 400 + + topog_id = TopologyId() + topog_id.topology_uuid.uuid = topology_id + topog_id.context_id.context_uuid.uuid = context_id + + ctx_client = ContextClient() + ctx_client.connect() + try: + topo, nodes = readTopologyDataFromContext(topog_id) + reservations = ctx_client.ListOpticalSpectrumReservations( + ContextId(context_uuid={"uuid": context_id}) + ).reservations + return compute_optical_connectivity_candidates(candidate_request, nodes, topo, reservations) + except OpticalCandidateError as exc: + return exc.to_reply(), exc.http_status + except Exception: + LOGGER.exception('Error while computing optical connectivity candidates') + return { + "candidates": [], + "rejected_reasons": [{"code": "INTERNAL", "message": "Candidate computation failed"}], + }, 500 + finally: + ctx_client.close() + if __name__ == '__main__': app.run(host='0.0.0.0', port=10060, debug=True) diff --git a/src/opticalcontroller/service/ConnectivityCandidates.py b/src/opticalcontroller/service/ConnectivityCandidates.py new file mode 100644 index 0000000000000000000000000000000000000000..4feec9ae7000b54f6a6b379b44e0c72086a96ae9 --- /dev/null +++ b/src/opticalcontroller/service/ConnectivityCandidates.py @@ -0,0 +1,416 @@ +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math, uuid +from collections import deque +from typing import Dict, Iterable, List, Optional, Set, Tuple + +from common.proto.context_pb2 import OpticalSpectrumReservationStatusEnum +from common.tools.grpc.Tools import grpc_message_to_json +from common.tools.object_factory.OpticalLink import correct_slot + + +ACTIVE_RESERVATION_STATUSES = { + OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, + OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, + "OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED", + "OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED", + 1, + 2, +} + +BAND_WIDTHS = { + "c_slots": 320, + "l_slots": 550, + "s_slots": 720, +} + +BAND_ORDER = ("c_slots", "l_slots", "s_slots") + + +class OpticalCandidateError(Exception): + def __init__(self, code: str, message: str, http_status: int = 400) -> None: + super().__init__(message) + self.code = code + self.message = message + self.http_status = http_status + + def to_reply(self) -> Dict: + return {"candidates": [], "rejected_reasons": [{"code": self.code, "message": self.message}]} + + +def map_rate_to_slots(rate: int) -> int: + if rate == 100: + return 4 + if rate == 400: + return 8 + if rate == 800: + return 12 + if rate == 1000: + return 18 + return 5 + + +def _to_json(message_or_dict) -> Dict: + if isinstance(message_or_dict, dict): + return message_or_dict + return grpc_message_to_json(message_or_dict, use_integers_for_enums=True) + + +def _device_uuid(endpoint_or_device) -> str: + if isinstance(endpoint_or_device, str): + return endpoint_or_device + if not isinstance(endpoint_or_device, dict): + raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint must be a string or object") + if "device_uuid" in endpoint_or_device: + return str(endpoint_or_device["device_uuid"]) + if "device_id" in endpoint_or_device: + return str(endpoint_or_device["device_id"]["device_uuid"]["uuid"]) + raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint does not contain device identifier") + + +def _endpoint_uuid(endpoint_or_device) -> Optional[str]: + if not isinstance(endpoint_or_device, dict): + return None + endpoint_uuid = endpoint_or_device.get("endpoint_uuid") + if isinstance(endpoint_uuid, dict): + return endpoint_uuid.get("uuid") + if endpoint_uuid is not None: + return str(endpoint_uuid) + return None + + +def _normalize_band(band: Optional[str]) -> Optional[str]: + if band is None: + return None + normalized = str(band).strip().lower().replace("-", "_") + if normalized in {"c", "c_band", "c_slots"}: + return "c_slots" + if normalized in {"l", "l_band", "l_slots"}: + return "l_slots" + if normalized in {"s", "s_band", "s_slots"}: + return "s_slots" + raise OpticalCandidateError("PREFERRED_BAND_UNAVAILABLE", "Unsupported optical band: {:s}".format(str(band))) + + +def _required_slots(request_json: Dict) -> Tuple[int, float]: + explicit_width = request_json.get("explicit_channel_width_ghz", request_json.get("channel_width_ghz")) + if explicit_width is not None: + width = float(explicit_width) + if width <= 0: + raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Channel width must be positive") + slots = int(math.ceil(width / 12.5)) + return slots, slots * 12.5 + + capacity = request_json.get("capacity_gbps", request_json.get("bitrate_gbps")) + if capacity is None: + raise OpticalCandidateError( + "UNSUPPORTED_CAPACITY_OR_MODULATION", + "Request must provide capacity_gbps/bitrate_gbps or explicit_channel_width_ghz", + ) + slots = map_rate_to_slots(int(capacity)) + return int(slots), int(slots) * 12.5 + + +def _device_index(devices: Iterable) -> Dict[str, Dict]: + index = {} + for device in devices: + device_json = _to_json(device) + device_uuid = device_json["device_id"]["device_uuid"]["uuid"] + index[device_uuid] = device_json + return index + + +def _slot_map(optical_link: Dict, band: str) -> Dict[str, int]: + details = optical_link.get("optical_details", {}) + raw_slots = details.get(band, {}) + if len(raw_slots) == 0: + return {} + return correct_slot(raw_slots, width=BAND_WIDTHS[band]) + + +def _active_reservation_ranges(reservations: Iterable, band: str, link_uuid: str) -> List[Tuple[int, int]]: + ranges = [] + for reservation in reservations: + reservation_json = _to_json(reservation) + if reservation_json.get("status") not in ACTIVE_RESERVATION_STATUSES: + continue + if _normalize_band(reservation_json.get("band")) != band: + continue + link_uuids = { + link_id["link_uuid"]["uuid"] + for link_id in reservation_json.get("optical_link_ids", []) + if "link_uuid" in link_id and "uuid" in link_id["link_uuid"] + } + if link_uuid not in link_uuids: + continue + ranges.append((int(reservation_json.get("n_start", 0)), int(reservation_json.get("n_end", -1)))) + return ranges + + +def _available_slots( + optical_link: Dict, band: str, reservations: Iterable, include_reserved_slots: bool +) -> Set[int]: + slots = { + int(slot) + for slot, state in _slot_map(optical_link, band).items() + if int(state) == 1 + } + if include_reserved_slots: + return slots + + link_uuid = optical_link["link_id"]["link_uuid"]["uuid"] + for n_start, n_end in _active_reservation_ranges(reservations, band, link_uuid): + slots.difference_update(range(n_start, n_end + 1)) + return slots + + +def _first_contiguous_range(slots: Set[int], required_slots: int) -> Optional[Tuple[int, int]]: + ranges = _contiguous_ranges(slots, required_slots) + if len(ranges) == 0: + return None + return ranges[0] + + +def _contiguous_ranges(slots: Set[int], required_slots: int) -> List[Tuple[int, int]]: + if required_slots <= 0: + return [] + ordered_slots = sorted(slots) + if len(ordered_slots) < required_slots: + return [] + ranges = [] + streak_start = ordered_slots[0] + previous = ordered_slots[0] + for slot in ordered_slots[1:]: + if slot == previous + 1: + previous = slot + else: + if previous - streak_start + 1 >= required_slots: + ranges.append((streak_start, previous)) + streak_start = slot + previous = slot + if previous - streak_start + 1 >= required_slots: + ranges.append((streak_start, previous)) + return ranges + + +def _range_dicts(ranges: Iterable[Tuple[int, int]]) -> List[Dict[str, int]]: + return [{"n_start": n_start, "n_end": n_end} for n_start, n_end in ranges] + + +def _preferred_slots(request_json: Dict, required_slots: int) -> Optional[Set[int]]: + n_start = request_json.get("preferred_n_start") + n_end = request_json.get("preferred_n_end") + if n_start is None and n_end is None: + return None + if n_start is None or n_end is None: + raise OpticalCandidateError( + "PREFERRED_RANGE_OCCUPIED", "Both preferred_n_start and preferred_n_end are required" + ) + n_start = int(n_start) + n_end = int(n_end) + if n_end < n_start: + raise OpticalCandidateError("PREFERRED_RANGE_OCCUPIED", "preferred_n_end must be >= preferred_n_start") + if (n_end - n_start + 1) < required_slots: + raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Preferred range is too small") + return set(range(n_start, n_start + required_slots)) + + +def _link_json_by_uuid(optical_links: Iterable, device_index: Dict[str, Dict]) -> Tuple[Dict[str, Dict], Dict[str, List]]: + link_by_uuid = {} + adjacency: Dict[str, List] = {} + for optical_link in optical_links: + link_json = _to_json(optical_link) + endpoints = link_json.get("link_endpoint_ids", []) + if len(endpoints) < 2: + continue + src_device = endpoints[0]["device_id"]["device_uuid"]["uuid"] + dst_device = endpoints[-1]["device_id"]["device_uuid"]["uuid"] + if src_device not in device_index or dst_device not in device_index: + continue + link_uuid = link_json["link_id"]["link_uuid"]["uuid"] + link_by_uuid[link_uuid] = link_json + adjacency.setdefault(src_device, []).append((dst_device, link_uuid)) + return link_by_uuid, adjacency + + +def _shortest_link_paths( + adjacency: Dict[str, List], src_device_uuid: str, dst_device_uuid: str, max_candidates: int +) -> List[List[str]]: + queue = deque([(src_device_uuid, [], {src_device_uuid})]) + paths = [] + while queue and len(paths) < max_candidates: + node, link_path, seen_nodes = queue.popleft() + if node == dst_device_uuid: + paths.append(link_path) + continue + for next_node, link_uuid in adjacency.get(node, []): + if next_node in seen_nodes: + continue + queue.append((next_node, link_path + [link_uuid], seen_nodes | {next_node})) + return paths + + +def _path_hops(link_path: List[str], link_by_uuid: Dict[str, Dict]) -> List[Dict]: + hops = [] + for index, link_uuid in enumerate(link_path): + optical_link = link_by_uuid[link_uuid] + endpoints = optical_link.get("link_endpoint_ids", []) + ingress = endpoints[0] if len(endpoints) > 0 else None + egress = endpoints[-1] if len(endpoints) > 0 else None + hop = {"sequence": index, "optical_link_id": link_uuid} + if ingress is not None: + hop["ingress_endpoint_id"] = ingress + hop["device_id"] = ingress.get("device_id") + if egress is not None: + hop["egress_endpoint_id"] = egress + hops.append(hop) + return hops + + +def _candidate_for_path( + link_path: List[str], link_by_uuid: Dict[str, Dict], reservations: Iterable, + required_slots: int, preferred_band: Optional[str], preferred_slots: Optional[Set[int]], + include_reserved_slots: bool, modulation_format: Optional[str], +) -> Tuple[Optional[Dict], List[Dict]]: + rejected = [] + band_order = (preferred_band,) if preferred_band is not None else BAND_ORDER + for band in band_order: + path_slots = None + for link_uuid in link_path: + available = _available_slots(link_by_uuid[link_uuid], band, reservations, include_reserved_slots) + path_slots = available if path_slots is None else path_slots.intersection(available) + path_slots = path_slots or set() + if preferred_slots is not None: + if not preferred_slots.issubset(path_slots): + rejected.append({ + "code": "PREFERRED_RANGE_OCCUPIED", + "message": "Preferred range is unavailable on at least one optical link", + "band": band, + "optical_link_ids": link_path, + }) + continue + n_start = min(preferred_slots) + n_end = max(preferred_slots) + available_ranges = [(n_start, n_end)] + else: + available_ranges = _contiguous_ranges(path_slots, required_slots) + if len(available_ranges) == 0: + rejected.append({ + "code": "INSUFFICIENT_CONTIGUOUS_SPECTRUM", + "message": "No contiguous spectrum block is available on every optical link in path", + "band": band, + "optical_link_ids": link_path, + }) + continue + n_start = available_ranges[0][0] + n_end = n_start + required_slots - 1 + candidate_uuid = str(uuid.uuid5( + uuid.NAMESPACE_URL, "{:s}|{:s}|{:d}|{:d}".format(",".join(link_path), band, n_start, n_end) + )) + return { + "candidate_uuid": candidate_uuid, + "validation_status": "VALID", + "band": band, + "n_start": n_start, + "n_end": n_end, + "required_slots": required_slots, + "modulation_format": modulation_format, + "available_slot_ranges": _range_dicts(available_ranges), + "optical_link_ids": link_path, + "path_hops": _path_hops(link_path, link_by_uuid), + "path_metric": len(link_path), + "available_slots_summary": { + "common_available_slots": len(path_slots), + "selected_range": "{:d}-{:d}".format(n_start, n_end), + "available_range_count": len(available_ranges), + }, + }, rejected + return None, rejected + + +def compute_optical_connectivity_candidates( + request_json: Dict, devices: Iterable, optical_links: Iterable, reservations: Iterable +) -> Tuple[Dict, int]: + required_slots, effective_channel_width_ghz = _required_slots(request_json) + src_endpoint = request_json.get("src_endpoint_id", request_json.get("src_device_uuid")) + dst_endpoint = request_json.get("dst_endpoint_id", request_json.get("dst_device_uuid")) + src_device_uuid = _device_uuid(src_endpoint) + dst_device_uuid = _device_uuid(dst_endpoint) + if src_device_uuid == dst_device_uuid: + raise OpticalCandidateError("INVALID_ENDPOINT", "Source and destination must be different") + + device_index = _device_index(devices) + if src_device_uuid not in device_index: + raise OpticalCandidateError("INVALID_ENDPOINT", "Source optical device not found", http_status=404) + if dst_device_uuid not in device_index: + raise OpticalCandidateError("INVALID_ENDPOINT", "Destination optical device not found", http_status=404) + + link_by_uuid, adjacency = _link_json_by_uuid(optical_links, device_index) + if len(link_by_uuid) == 0: + raise OpticalCandidateError("NO_OPTICAL_TOPOLOGY", "No optical links are available", http_status=404) + + max_candidates = int(request_json.get("max_candidates", 3)) + max_candidates = max(1, max_candidates) + paths = _shortest_link_paths(adjacency, src_device_uuid, dst_device_uuid, max_candidates) + if len(paths) == 0: + return { + "required_slots": required_slots, + "effective_channel_width_ghz": effective_channel_width_ghz, + "candidates": [], + "rejected_reasons": [{"code": "NO_PATH", "message": "No optical path found"}], + "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), + }, 409 + + preferred_band = _normalize_band(request_json.get("preferred_band")) + preferred_range = _preferred_slots(request_json, required_slots) + include_reserved_slots = bool(request_json.get("include_reserved_slots", False)) + modulation_format = request_json.get("modulation_format") + candidates = [] + rejected_reasons = [] + for link_path in paths: + candidate, rejected = _candidate_for_path( + link_path, link_by_uuid, reservations, required_slots, preferred_band, + preferred_range, include_reserved_slots, modulation_format + ) + rejected_reasons.extend(rejected) + if candidate is not None: + candidates.append(candidate) + + reply = { + "required_slots": required_slots, + "effective_channel_width_ghz": effective_channel_width_ghz, + "candidates": candidates, + "rejected_reasons": rejected_reasons, + "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), + } + return reply, 200 if len(candidates) > 0 else 409 + + +def _request_summary(request_json: Dict, src_endpoint, dst_endpoint) -> Dict: + return { + "src_endpoint_id": src_endpoint, + "dst_endpoint_id": dst_endpoint, + "src_endpoint_uuid": _endpoint_uuid(src_endpoint), + "dst_endpoint_uuid": _endpoint_uuid(dst_endpoint), + "capacity_gbps": request_json.get("capacity_gbps", request_json.get("bitrate_gbps")), + "modulation_format": request_json.get("modulation_format"), + "explicit_channel_width_ghz": request_json.get( + "explicit_channel_width_ghz", request_json.get("channel_width_ghz") + ), + "requested_modulation_format": request_json.get("modulation_format"), + "preferred_band": request_json.get("preferred_band"), + "preferred_n_start": request_json.get("preferred_n_start"), + "preferred_n_end": request_json.get("preferred_n_end"), + } diff --git a/src/opticalcontroller/service/__init__.py b/src/opticalcontroller/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..b53987a4eae1aed245eba5c7ddd8cd10e35919c2 --- /dev/null +++ b/src/opticalcontroller/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalcontroller/tests/test_optical_connectivity_candidates.py b/src/opticalcontroller/tests/test_optical_connectivity_candidates.py new file mode 100644 index 0000000000000000000000000000000000000000..261b6ed3f847f393046b88fd99d3c2968fac89f7 --- /dev/null +++ b/src/opticalcontroller/tests/test_optical_connectivity_candidates.py @@ -0,0 +1,189 @@ +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.proto.context_pb2 import ( + ContextId, Device, LinkId, OpticalLink, OpticalSpectrumReservation, + OpticalSpectrumReservationId, TopologyId, +) +from opticalcontroller.service.ConnectivityCandidates import compute_optical_connectivity_candidates + + +def _device(device_uuid: str, device_type: str = 'emu-optical-transponder') -> Device: + return Device( + device_id={'device_uuid': {'uuid': device_uuid}}, + name=device_uuid, + device_type=device_type, + ) + + +def _topology_id() -> TopologyId: + return TopologyId( + context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), + topology_uuid={'uuid': DEFAULT_TOPOLOGY_NAME}, + ) + + +def _endpoint(device_uuid: str, endpoint_uuid: str): + return { + 'device_id': {'device_uuid': {'uuid': device_uuid}}, + 'endpoint_uuid': {'uuid': endpoint_uuid}, + 'topology_id': { + 'context_id': {'context_uuid': {'uuid': DEFAULT_CONTEXT_NAME}}, + 'topology_uuid': {'uuid': DEFAULT_TOPOLOGY_NAME}, + }, + } + + +def _optical_link(src: str, dst: str, c_slots=None) -> OpticalLink: + if c_slots is None: + c_slots = {'0': 0, '1': 0, '2': 0} + return OpticalLink( + name='{:s}-{:s}'.format(src, dst), + link_id=LinkId(link_uuid={'uuid': 'OL:{:s}=={:s}'.format(src, dst)}), + link_endpoint_ids=[_endpoint(src, 'LINE'), _endpoint(dst, 'LINE')], + optical_details={ + 'src_port': 'LINE', + 'dst_port': 'LINE', + 'local_peer_port': 'LINE', + 'remote_peer_port': 'LINE', + 'c_slots': c_slots, + }, + ) + + +def _reservation(name: str, link_uuid: str, n_start: int, n_end: int) -> OpticalSpectrumReservation: + return OpticalSpectrumReservation( + reservation_id=OpticalSpectrumReservationId( + context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), + reservation_uuid={'uuid': name}, + ), + topology_id=_topology_id(), + optical_link_ids=[LinkId(link_uuid={'uuid': link_uuid})], + band='c_slots', + n_start=n_start, + n_end=n_end, + required_slots=n_end - n_start + 1, + status=1, + ) + + +def test_compute_optical_connectivity_candidates_returns_first_feasible_range(): + reply, status = compute_optical_connectivity_candidates( + { + 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, + 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, + 'capacity_gbps': 100, + 'preferred_band': 'c_slots', + }, + [_device('A'), _device('B')], + [_optical_link('A', 'B')], + [], + ) + + assert status == 200 + assert reply['required_slots'] == 4 + assert len(reply['candidates']) == 1 + candidate = reply['candidates'][0] + assert candidate['band'] == 'c_slots' + assert candidate['n_start'] == 3 + assert candidate['n_end'] == 6 + assert candidate['optical_link_ids'] == ['OL:A==B'] + + +def test_compute_optical_connectivity_candidates_filters_active_reservations(): + link_uuid = 'OL:A==B' + reply, status = compute_optical_connectivity_candidates( + { + 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, + 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, + 'capacity_gbps': 100, + 'preferred_band': 'c_slots', + }, + [_device('A'), _device('B')], + [_optical_link('A', 'B')], + [_reservation('held', link_uuid, 3, 6)], + ) + + assert status == 200 + assert reply['candidates'][0]['n_start'] == 7 + assert reply['candidates'][0]['n_end'] == 10 + assert reply['candidates'][0]['available_slot_ranges'][0] == {'n_start': 7, 'n_end': 319} + + +def test_compute_optical_connectivity_candidates_returns_all_feasible_ranges(): + link_uuid = 'OL:A==B' + reply, status = compute_optical_connectivity_candidates( + { + 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, + 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, + 'capacity_gbps': 100, + 'modulation_format': 'dp-qpsk', + 'preferred_band': 'c_slots', + }, + [_device('A'), _device('B')], + [_optical_link('A', 'B')], + [ + _reservation('held-a', link_uuid, 3, 6), + _reservation('held-b', link_uuid, 11, 14), + ], + ) + + assert status == 200 + candidate = reply['candidates'][0] + assert candidate['n_start'] == 7 + assert candidate['n_end'] == 10 + assert candidate['modulation_format'] == 'dp-qpsk' + assert reply['request_summary']['requested_modulation_format'] == 'dp-qpsk' + assert candidate['available_slot_ranges'][:2] == [ + {'n_start': 7, 'n_end': 10}, + {'n_start': 15, 'n_end': 319}, + ] + + +def test_compute_optical_connectivity_candidates_rejects_unavailable_preferred_range(): + reply, status = compute_optical_connectivity_candidates( + { + 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, + 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, + 'capacity_gbps': 100, + 'preferred_band': 'c_slots', + 'preferred_n_start': 0, + 'preferred_n_end': 3, + }, + [_device('A'), _device('B')], + [_optical_link('A', 'B')], + [], + ) + + assert status == 409 + assert reply['candidates'] == [] + assert reply['rejected_reasons'][0]['code'] == 'PREFERRED_RANGE_OCCUPIED' + + +def test_compute_optical_connectivity_candidates_rejects_missing_path(): + reply, status = compute_optical_connectivity_candidates( + { + 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, + 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'C'}}}, + 'capacity_gbps': 100, + }, + [_device('A'), _device('B'), _device('C')], + [_optical_link('A', 'B')], + [], + ) + + assert status == 409 + assert reply['candidates'] == [] + assert reply['rejected_reasons'][0]['code'] == 'NO_PATH' diff --git a/src/tests/spectrum_negotiation/README.md b/src/tests/spectrum_negotiation/README.md index 7b439b1a8cf6ad15814918c5a730d7988dcd65ad..cf560fecc5ba917e43581a0fe0ec3a46dc29d201 100644 --- a/src/tests/spectrum_negotiation/README.md +++ b/src/tests/spectrum_negotiation/README.md @@ -39,3 +39,67 @@ The script validates: Expected rejection status for overlapping or occupied-slot requests is HTTP `409 Conflict`, mapped from gRPC `ALREADY_EXISTS`. + +## Live Candidate Validation + +Run against a local TFS deployment: + +```bash +python3 src/tests/spectrum_negotiation/live_candidate_validation.py +``` + +Run against a specific testbed node: + +```bash +python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ + --base-url http://172.16.0.101/tfs-api +``` + +Run against a specific optical pair: + +```bash +python3 src/tests/spectrum_negotiation/live_candidate_validation.py \ + --base-url http://172.16.0.101/tfs-api \ + --context admin \ + --topology admin \ + --src-device DC1-TP1 \ + --dst-device DOMAIN-B \ + --channel-width-ghz 50 +``` + +The script validates: + +- TFS-API context and topology discovery. +- Optical link inventory discovery. +- Descriptor device-name to TFS UUID resolution through topology details. +- Candidate computation through the NBI facade + `/tfs-api/context//topology//optical_connectivity_candidates`. +- Candidate feasibility for a directed optical endpoint pair. +- Reservation of the returned candidate slot range on all selected optical links. +- Rejection of the same range when requested as a preferred range. +- Re-computation that selects a second non-overlapping candidate after the + first range is reserved. +- Reservation of the second candidate range on all selected optical links. +- Rejection of both reserved ranges when each is requested as a preferred + range. +- Re-computation that selects a third candidate avoiding both active + reservations. +- Release of all temporary reservations. +- Final cleanup verification that no active `codex-live-candidate-test` + reservation remains. + +This script is intended as a future CI seed once the CI deployment includes the +optical controller component and a loaded optical topology. + +An optional pytest wrapper is available and is skipped unless `TFS_API_BASE_URL` +is defined: + +```bash +TFS_API_BASE_URL=http://172.16.0.101/tfs-api \ +TFS_CONTEXT_UUID=admin \ +TFS_TOPOLOGY_UUID=admin \ +TFS_OPTICAL_SRC_DEVICE=DC1-TP1 \ +TFS_OPTICAL_DST_DEVICE=DOMAIN-B \ +TFS_OPTICAL_CHANNEL_WIDTH_GHZ=50 \ +pytest -q src/tests/spectrum_negotiation/test_live_candidate_validation.py +``` diff --git a/src/tests/spectrum_negotiation/live_candidate_validation.py b/src/tests/spectrum_negotiation/live_candidate_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..386a46499d92715196b65d8fc3d380e1621cd581 --- /dev/null +++ b/src/tests/spectrum_negotiation/live_candidate_validation.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import time +import urllib.error +import urllib.parse +import urllib.request +from collections import deque + +DEFAULT_BASE_URL = 'http://127.0.0.1/tfs-api' +OWNER_ID = 'codex-live-candidate-test' + + +def http_request(base_url, method, path, payload=None, timeout=30): + data = None + headers = {} + if payload is not None: + data = json.dumps(payload).encode('utf-8') + headers['Content-Type'] = 'application/json' + request = urllib.request.Request(base_url + path, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + body = response.read().decode('utf-8') + return response.status, json.loads(body) if body else None + except urllib.error.HTTPError as exc: + body = exc.read().decode('utf-8') + try: + payload = json.loads(body) if body else None + except json.JSONDecodeError: + payload = body + return exc.code, payload + + +def first_context_and_topology(base_url, context_uuid=None, topology_uuid=None): + if context_uuid is not None and topology_uuid is not None: + return context_uuid, topology_uuid + + status, contexts = http_request(base_url, 'GET', '/contexts') + assert status == 200, (status, contexts) + assert contexts.get('contexts'), contexts + + for context in contexts['contexts']: + candidate_context_uuid = context['context_id']['context_uuid']['uuid'] + if context_uuid is not None and candidate_context_uuid != context_uuid: + continue + for topology_id in context.get('topology_ids', []): + candidate_topology_uuid = topology_id['topology_uuid']['uuid'] + if topology_uuid is not None and candidate_topology_uuid != topology_uuid: + continue + return candidate_context_uuid, candidate_topology_uuid + + raise AssertionError('Requested context/topology not found') + + +def optical_link_uuid(link): + return link['link_id']['link_uuid']['uuid'] + + +def endpoint_device_uuid(endpoint_id): + return endpoint_id['device_id']['device_uuid']['uuid'] + + +def endpoint_uuid(endpoint_id): + return endpoint_id['endpoint_uuid']['uuid'] + + +def list_optical_links(base_url): + status, optical_links = http_request(base_url, 'GET', '/optical_links') + assert status == 200, (status, optical_links) + links = optical_links.get('optical_links', []) + assert links, 'No optical links exposed by TFS-API' + return links + + +def topology_details(base_url, context_uuid, topology_uuid): + path = '/context/{:s}/topology_details/{:s}'.format( + urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) + ) + status, details = http_request(base_url, 'GET', path) + assert status == 200, (status, details) + return details + + +def device_uuid(device): + return device['device_id']['device_uuid']['uuid'] + + +def device_name(device): + return device.get('name') or device_uuid(device) + + +def endpoint_name(endpoint): + return endpoint.get('name') or endpoint['endpoint_id']['endpoint_uuid']['uuid'] + + +def resolve_device_uuid(devices, device_name_or_uuid): + for device in devices: + if device_uuid(device) == device_name_or_uuid or device_name(device) == device_name_or_uuid: + return device_uuid(device) + raise AssertionError('Optical device not found: {:s}'.format(device_name_or_uuid)) + + +def resolve_endpoint_uuid(devices, resolved_device_uuid, preferred_endpoint_name): + for device in devices: + if device_uuid(device) != resolved_device_uuid: + continue + endpoints = device.get('device_endpoints', []) + for endpoint in endpoints: + endpoint_uuid_value = endpoint['endpoint_id']['endpoint_uuid']['uuid'] + if endpoint_uuid_value == preferred_endpoint_name or endpoint_name(endpoint) == preferred_endpoint_name: + return endpoint_uuid_value + if endpoints: + return endpoints[0]['endpoint_id']['endpoint_uuid']['uuid'] + return preferred_endpoint_name + + +def endpoint_id(device_uuid, endpoint_uuid): + return { + 'device_id': {'device_uuid': {'uuid': device_uuid}}, + 'endpoint_uuid': {'uuid': endpoint_uuid}, + } + + +def build_graph(optical_links): + adjacency = {} + devices = set() + for link in optical_links: + endpoints = link.get('link_endpoint_ids', []) + if len(endpoints) < 2: + continue + src_device = endpoint_device_uuid(endpoints[0]) + dst_device = endpoint_device_uuid(endpoints[-1]) + devices.add(src_device) + devices.add(dst_device) + adjacency.setdefault(src_device, []).append(dst_device) + return devices, adjacency + + +def path_exists(adjacency, src_device_uuid, dst_device_uuid): + queue = deque([src_device_uuid]) + seen = {src_device_uuid} + while queue: + device_uuid = queue.popleft() + if device_uuid == dst_device_uuid: + return True + for next_device_uuid in adjacency.get(device_uuid, []): + if next_device_uuid in seen: + continue + seen.add(next_device_uuid) + queue.append(next_device_uuid) + return False + + +def choose_endpoint_pair(optical_links, topology_devices, src_device=None, dst_device=None): + named_devices = {device_name(device): device_uuid(device) for device in topology_devices} + graph_devices, adjacency = build_graph(optical_links) + if src_device is not None and dst_device is not None: + src_device_uuid = resolve_device_uuid(topology_devices, src_device) + dst_device_uuid = resolve_device_uuid(topology_devices, dst_device) + assert src_device_uuid in graph_devices, 'Source optical device not found in optical graph: {:s}'.format( + src_device + ) + assert dst_device_uuid in graph_devices, 'Destination optical device not found in optical graph: {:s}'.format( + dst_device + ) + assert path_exists(adjacency, src_device_uuid, dst_device_uuid), ( + 'No optical path found between {:s} and {:s}'.format(src_device, dst_device) + ) + src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, 'CHANNEL') + dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, 'CHANNEL') + return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) + + preferred_pairs = [ + ('DC1-TP1', 'DOMAIN-B', 'CHANNEL', 'port-in'), + ('DC1-TP1', 'DC2-TP1', 'CHANNEL', 'CHANNEL'), + ('DC1-TP1', 'DC3-TP1', 'CHANNEL', 'CHANNEL'), + ('DC1-TP1', 'DC4-TP1', 'CHANNEL', 'CHANNEL'), + ] + for src_device, dst_device, src_endpoint, dst_endpoint in preferred_pairs: + if src_device not in named_devices or dst_device not in named_devices: + continue + src_device_uuid = named_devices[src_device] + dst_device_uuid = named_devices[dst_device] + if src_device_uuid in graph_devices and dst_device_uuid in graph_devices and path_exists( + adjacency, src_device_uuid, dst_device_uuid + ): + src_endpoint_uuid = resolve_endpoint_uuid(topology_devices, src_device_uuid, src_endpoint) + dst_endpoint_uuid = resolve_endpoint_uuid(topology_devices, dst_device_uuid, dst_endpoint) + return endpoint_id(src_device_uuid, src_endpoint_uuid), endpoint_id(dst_device_uuid, dst_endpoint_uuid) + + for src_device in sorted(graph_devices): + for dst_device in sorted(graph_devices): + if src_device != dst_device and path_exists(adjacency, src_device, dst_device): + return endpoint_id(src_device, 'CHANNEL'), endpoint_id(dst_device, 'CHANNEL') + + raise AssertionError('No optical endpoint pair with a directed path was found') + + +def candidate_request(src_endpoint_id, dst_endpoint_id, channel_width_ghz, preferred_n_start=None, preferred_n_end=None): + request = { + 'src_endpoint_id': src_endpoint_id, + 'dst_endpoint_id': dst_endpoint_id, + 'channel_width_ghz': channel_width_ghz, + 'preferred_band': 'c_slots', + 'max_candidates': 3, + } + if preferred_n_start is not None: + request['preferred_n_start'] = preferred_n_start + if preferred_n_end is not None: + request['preferred_n_end'] = preferred_n_end + return request + + +def reservation_payload(context_uuid, topology_uuid, reservation_uuid, optical_link_ids, band, n_start, n_end): + return { + 'reservation_id': { + 'context_id': {'context_uuid': {'uuid': context_uuid}}, + 'reservation_uuid': {'uuid': reservation_uuid}, + }, + 'topology_id': { + 'context_id': {'context_uuid': {'uuid': context_uuid}}, + 'topology_uuid': {'uuid': topology_uuid}, + }, + 'optical_link_ids': [{'link_uuid': {'uuid': link_uuid}} for link_uuid in optical_link_ids], + 'band': band, + 'n_start': n_start, + 'n_end': n_end, + 'required_slots': n_end - n_start + 1, + 'owner_id': OWNER_ID, + 'correlation_id': reservation_uuid, + } + + +def release_reservation(base_url, context_uuid, reservation_uuid): + release_path = '/context/{:s}/optical_spectrum_reservation/{:s}/release'.format( + urllib.parse.quote(context_uuid), urllib.parse.quote(reservation_uuid) + ) + status, reply = http_request(base_url, 'POST', release_path) + assert status == 200, (status, reply) + + +def active_test_reservations(reply): + reservations = reply.get('reservations', []) + return [ + reservation for reservation in reservations + if reservation.get('owner_id') == OWNER_ID + and reservation.get('status') not in ('OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED', 'RELEASED') + ] + + +def selected_candidate(reply): + assert reply.get('candidates'), reply + candidate = reply['candidates'][0] + assert candidate.get('validation_status') == 'VALID', candidate + assert candidate.get('optical_link_ids'), candidate + assert candidate['n_end'] >= candidate['n_start'], candidate + return candidate + + +def ranges_overlap(left, right): + if left['band'] != right['band']: + return False + return not (left['n_end'] < right['n_start'] or left['n_start'] > right['n_end']) + + +def assert_non_overlapping(candidate, blocked_candidates): + for blocked_candidate in blocked_candidates: + assert not ranges_overlap(candidate, blocked_candidate), (candidate, blocked_candidate) + + +def assert_preferred_range_rejected(base_url, candidate_path, src_endpoint, dst_endpoint, channel_width_ghz, candidate): + blocked_request = candidate_request( + src_endpoint, dst_endpoint, channel_width_ghz, + preferred_n_start=candidate['n_start'], preferred_n_end=candidate['n_end'] + ) + status, blocked_reply = http_request(base_url, 'POST', candidate_path, blocked_request) + assert status == 409, (status, blocked_reply) + assert blocked_reply.get('rejected_reasons'), blocked_reply + + +def create_candidate_reservation(base_url, context_uuid, topology_uuid, list_path, reservation_uuid, candidate): + reservation = reservation_payload( + context_uuid, topology_uuid, reservation_uuid, candidate['optical_link_ids'], + candidate['band'], candidate['n_start'], candidate['n_end'] + ) + status, create_reply = http_request(base_url, 'POST', list_path, reservation) + assert status == 200, (status, create_reply) + + +def main(): + parser = argparse.ArgumentParser( + description='Validate live optical path spectrum candidate behavior through the TFS-API facade.' + ) + parser.add_argument('--base-url', default=DEFAULT_BASE_URL, help='TFS-API base URL, default: %(default)s') + parser.add_argument('--context', help='Context UUID/name. Auto-discovered by default.') + parser.add_argument('--topology', help='Topology UUID/name. Auto-discovered by default.') + parser.add_argument('--src-device', help='Source optical device UUID. Auto-selected by default.') + parser.add_argument('--dst-device', help='Destination optical device UUID. Auto-selected by default.') + parser.add_argument('--channel-width-ghz', type=float, default=50.0, help='Requested optical width.') + args = parser.parse_args() + + context_uuid, topology_uuid = first_context_and_topology(args.base_url, args.context, args.topology) + optical_links = list_optical_links(args.base_url) + details = topology_details(args.base_url, context_uuid, topology_uuid) + src_endpoint, dst_endpoint = choose_endpoint_pair( + optical_links, details.get('devices', []), args.src_device, args.dst_device + ) + + candidate_path = '/context/{:s}/topology/{:s}/optical_connectivity_candidates'.format( + urllib.parse.quote(context_uuid), urllib.parse.quote(topology_uuid) + ) + request_payload = candidate_request(src_endpoint, dst_endpoint, args.channel_width_ghz) + status, initial_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) + assert status == 200, (status, initial_reply) + first_candidate = selected_candidate(initial_reply) + + reservation_uuid = 'live-candidate-{:d}'.format(int(time.time())) + second_reservation_uuid = reservation_uuid + '-second' + list_path = '/context/{:s}/optical_spectrum_reservations'.format(urllib.parse.quote(context_uuid)) + created_reservations = [] + second_candidate = None + third_candidate = None + try: + create_candidate_reservation( + args.base_url, context_uuid, topology_uuid, list_path, reservation_uuid, first_candidate + ) + created_reservations.append(reservation_uuid) + + assert_preferred_range_rejected( + args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate + ) + + status, next_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) + assert status == 200, (status, next_reply) + second_candidate = selected_candidate(next_reply) + assert_non_overlapping(second_candidate, [first_candidate]) + + create_candidate_reservation( + args.base_url, context_uuid, topology_uuid, list_path, second_reservation_uuid, second_candidate + ) + created_reservations.append(second_reservation_uuid) + + assert_preferred_range_rejected( + args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, first_candidate + ) + assert_preferred_range_rejected( + args.base_url, candidate_path, src_endpoint, dst_endpoint, args.channel_width_ghz, second_candidate + ) + + status, third_reply = http_request(args.base_url, 'POST', candidate_path, request_payload) + assert status == 200, (status, third_reply) + third_candidate = selected_candidate(third_reply) + assert_non_overlapping(third_candidate, [first_candidate, second_candidate]) + + finally: + for created_reservation_uuid in reversed(created_reservations): + release_reservation(args.base_url, context_uuid, created_reservation_uuid) + + status, after = http_request(args.base_url, 'GET', list_path) + assert status == 200, (status, after) + assert not active_test_reservations(after), after + + print('LIVE_CANDIDATE_VALIDATION_OK') + print('context_uuid={:s}'.format(context_uuid)) + print('topology_uuid={:s}'.format(topology_uuid)) + print('src={:s}/{:s}'.format(endpoint_device_uuid(src_endpoint), endpoint_uuid(src_endpoint))) + print('dst={:s}/{:s}'.format(endpoint_device_uuid(dst_endpoint), endpoint_uuid(dst_endpoint))) + print('selected_band={:s}'.format(first_candidate['band'])) + print('selected_slots={:d}-{:d}'.format(first_candidate['n_start'], first_candidate['n_end'])) + if second_candidate is not None: + print('second_slots={:d}-{:d}'.format(second_candidate['n_start'], second_candidate['n_end'])) + if third_candidate is not None: + print('third_slots={:d}-{:d}'.format(third_candidate['n_start'], third_candidate['n_end'])) + print('selected_links={:s}'.format(','.join(first_candidate['optical_link_ids']))) + + +if __name__ == '__main__': + main() diff --git a/src/tests/spectrum_negotiation/test_live_candidate_validation.py b/src/tests/spectrum_negotiation/test_live_candidate_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..b9db072a625170b629f7e89d69997793db3e8187 --- /dev/null +++ b/src/tests/spectrum_negotiation/test_live_candidate_validation.py @@ -0,0 +1,42 @@ +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import sys +from pathlib import Path + +import pytest + + +def test_live_candidate_validation(): + base_url = os.environ.get('TFS_API_BASE_URL') + if base_url is None: + pytest.skip('Set TFS_API_BASE_URL to enable live optical candidate validation') + + script_path = Path(__file__).with_name('live_candidate_validation.py') + command = [sys.executable, str(script_path), '--base-url', base_url] + + for env_name, option_name in ( + ('TFS_CONTEXT_UUID', '--context'), + ('TFS_TOPOLOGY_UUID', '--topology'), + ('TFS_OPTICAL_SRC_DEVICE', '--src-device'), + ('TFS_OPTICAL_DST_DEVICE', '--dst-device'), + ('TFS_OPTICAL_CHANNEL_WIDTH_GHZ', '--channel-width-ghz'), + ): + value = os.environ.get(env_name) + if value is not None: + command.extend([option_name, value]) + + subprocess.run(command, check=True)