Loading src/opticalcontroller/OpticalController.py +43 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,14 @@ from flask import Flask, request from flask import render_template from common.DeviceTypes import DeviceTypeEnum from flask_restplus import Resource, Api from context.client.ContextClient import ContextClient from opticalcontroller.tools import * from opticalcontroller.variables import * from opticalcontroller.RSA import RSA from common.proto.context_pb2 import TopologyId , OpticalLink from opticalcontroller.service.ConnectivityCandidates import ( OpticalCandidateError, compute_optical_connectivity_candidates ) from common.proto.context_pb2 import ContextId, TopologyId , OpticalLink import json from google.protobuf.message import Message from google.protobuf.json_format import MessageToDict Loading Loading @@ -512,5 +516,43 @@ class GetTopology(Resource): #print(f'err {e}') return 'Error', 400 @optical.route('/ComputeOpticalConnectivityCandidates/<path:context_id>/<path:topology_id>', methods=['POST']) @optical.response(200, 'Success') @optical.response(400, 'Invalid request') @optical.response(404, 'Error, not found') @optical.response(409, 'No feasible optical candidate') class ComputeOpticalConnectivityCandidates(Resource): @staticmethod def post(context_id: str, topology_id: str): candidate_request = request.get_json() if candidate_request is None: return { "candidates": [], "rejected_reasons": [{"code": "INVALID_REQUEST", "message": "Missing request body"}], }, 400 topog_id = TopologyId() topog_id.topology_uuid.uuid = topology_id topog_id.context_id.context_uuid.uuid = context_id ctx_client = ContextClient() ctx_client.connect() try: topo, nodes = readTopologyDataFromContext(topog_id) reservations = ctx_client.ListOpticalSpectrumReservations( ContextId(context_uuid={"uuid": context_id}) ).reservations return compute_optical_connectivity_candidates(candidate_request, nodes, topo, reservations) except OpticalCandidateError as exc: return exc.to_reply(), exc.http_status except Exception: LOGGER.exception('Error while computing optical connectivity candidates') return { "candidates": [], "rejected_reasons": [{"code": "INTERNAL", "message": "Candidate computation failed"}], }, 500 finally: ctx_client.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=10060, debug=True) src/opticalcontroller/service/ConnectivityCandidates.py 0 → 100644 +399 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math, uuid from collections import deque from typing import Dict, Iterable, List, Optional, Set, Tuple from common.proto.context_pb2 import OpticalSpectrumReservationStatusEnum from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.OpticalLink import correct_slot ACTIVE_RESERVATION_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, "OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED", "OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED", 1, 2, } BAND_WIDTHS = { "c_slots": 320, "l_slots": 550, "s_slots": 720, } BAND_ORDER = ("c_slots", "l_slots", "s_slots") class OpticalCandidateError(Exception): def __init__(self, code: str, message: str, http_status: int = 400) -> None: super().__init__(message) self.code = code self.message = message self.http_status = http_status def to_reply(self) -> Dict: return {"candidates": [], "rejected_reasons": [{"code": self.code, "message": self.message}]} def map_rate_to_slots(rate: int) -> int: if rate == 100: return 4 if rate == 400: return 8 if rate == 800: return 12 if rate == 1000: return 18 return 5 def _to_json(message_or_dict) -> Dict: if isinstance(message_or_dict, dict): return message_or_dict return grpc_message_to_json(message_or_dict, use_integers_for_enums=True) def _device_uuid(endpoint_or_device) -> str: if isinstance(endpoint_or_device, str): return endpoint_or_device if not isinstance(endpoint_or_device, dict): raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint must be a string or object") if "device_uuid" in endpoint_or_device: return str(endpoint_or_device["device_uuid"]) if "device_id" in endpoint_or_device: return str(endpoint_or_device["device_id"]["device_uuid"]["uuid"]) raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint does not contain device identifier") def _endpoint_uuid(endpoint_or_device) -> Optional[str]: if not isinstance(endpoint_or_device, dict): return None endpoint_uuid = endpoint_or_device.get("endpoint_uuid") if isinstance(endpoint_uuid, dict): return endpoint_uuid.get("uuid") if endpoint_uuid is not None: return str(endpoint_uuid) return None def _normalize_band(band: Optional[str]) -> Optional[str]: if band is None: return None normalized = str(band).strip().lower().replace("-", "_") if normalized in {"c", "c_band", "c_slots"}: return "c_slots" if normalized in {"l", "l_band", "l_slots"}: return "l_slots" if normalized in {"s", "s_band", "s_slots"}: return "s_slots" raise OpticalCandidateError("PREFERRED_BAND_UNAVAILABLE", "Unsupported optical band: {:s}".format(str(band))) def _required_slots(request_json: Dict) -> Tuple[int, float]: explicit_width = request_json.get("explicit_channel_width_ghz", request_json.get("channel_width_ghz")) if explicit_width is not None: width = float(explicit_width) if width <= 0: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Channel width must be positive") slots = int(math.ceil(width / 12.5)) return slots, slots * 12.5 capacity = request_json.get("capacity_gbps", request_json.get("bitrate_gbps")) if capacity is None: raise OpticalCandidateError( "UNSUPPORTED_CAPACITY_OR_MODULATION", "Request must provide capacity_gbps/bitrate_gbps or explicit_channel_width_ghz", ) slots = map_rate_to_slots(int(capacity)) return int(slots), int(slots) * 12.5 def _device_index(devices: Iterable) -> Dict[str, Dict]: index = {} for device in devices: device_json = _to_json(device) device_uuid = device_json["device_id"]["device_uuid"]["uuid"] index[device_uuid] = device_json return index def _slot_map(optical_link: Dict, band: str) -> Dict[str, int]: details = optical_link.get("optical_details", {}) raw_slots = details.get(band, {}) if len(raw_slots) == 0: return {} return correct_slot(raw_slots, width=BAND_WIDTHS[band]) def _active_reservation_ranges(reservations: Iterable, band: str, link_uuid: str) -> List[Tuple[int, int]]: ranges = [] for reservation in reservations: reservation_json = _to_json(reservation) if reservation_json.get("status") not in ACTIVE_RESERVATION_STATUSES: continue if _normalize_band(reservation_json.get("band")) != band: continue link_uuids = { link_id["link_uuid"]["uuid"] for link_id in reservation_json.get("optical_link_ids", []) if "link_uuid" in link_id and "uuid" in link_id["link_uuid"] } if link_uuid not in link_uuids: continue ranges.append((int(reservation_json.get("n_start", 0)), int(reservation_json.get("n_end", -1)))) return ranges def _available_slots( optical_link: Dict, band: str, reservations: Iterable, include_reserved_slots: bool ) -> Set[int]: slots = { int(slot) for slot, state in _slot_map(optical_link, band).items() if int(state) == 1 } if include_reserved_slots: return slots link_uuid = optical_link["link_id"]["link_uuid"]["uuid"] for n_start, n_end in _active_reservation_ranges(reservations, band, link_uuid): slots.difference_update(range(n_start, n_end + 1)) return slots def _first_contiguous_range(slots: Set[int], required_slots: int) -> Optional[Tuple[int, int]]: if required_slots <= 0: return None ordered_slots = sorted(slots) if len(ordered_slots) < required_slots: return None streak_start = ordered_slots[0] previous = ordered_slots[0] streak_len = 1 if required_slots == 1: return streak_start, streak_start for slot in ordered_slots[1:]: if slot == previous + 1: streak_len += 1 else: streak_start = slot streak_len = 1 previous = slot if streak_len >= required_slots: return streak_start, slot return None def _preferred_slots(request_json: Dict, required_slots: int) -> Optional[Set[int]]: n_start = request_json.get("preferred_n_start") n_end = request_json.get("preferred_n_end") if n_start is None and n_end is None: return None if n_start is None or n_end is None: raise OpticalCandidateError( "PREFERRED_RANGE_OCCUPIED", "Both preferred_n_start and preferred_n_end are required" ) n_start = int(n_start) n_end = int(n_end) if n_end < n_start: raise OpticalCandidateError("PREFERRED_RANGE_OCCUPIED", "preferred_n_end must be >= preferred_n_start") if (n_end - n_start + 1) < required_slots: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Preferred range is too small") return set(range(n_start, n_start + required_slots)) def _link_json_by_uuid(optical_links: Iterable, device_index: Dict[str, Dict]) -> Tuple[Dict[str, Dict], Dict[str, List]]: link_by_uuid = {} adjacency: Dict[str, List] = {} for optical_link in optical_links: link_json = _to_json(optical_link) endpoints = link_json.get("link_endpoint_ids", []) if len(endpoints) < 2: continue src_device = endpoints[0]["device_id"]["device_uuid"]["uuid"] dst_device = endpoints[-1]["device_id"]["device_uuid"]["uuid"] if src_device not in device_index or dst_device not in device_index: continue link_uuid = link_json["link_id"]["link_uuid"]["uuid"] link_by_uuid[link_uuid] = link_json adjacency.setdefault(src_device, []).append((dst_device, link_uuid)) return link_by_uuid, adjacency def _shortest_link_paths( adjacency: Dict[str, List], src_device_uuid: str, dst_device_uuid: str, max_candidates: int ) -> List[List[str]]: queue = deque([(src_device_uuid, [], {src_device_uuid})]) paths = [] while queue and len(paths) < max_candidates: node, link_path, seen_nodes = queue.popleft() if node == dst_device_uuid: paths.append(link_path) continue for next_node, link_uuid in adjacency.get(node, []): if next_node in seen_nodes: continue queue.append((next_node, link_path + [link_uuid], seen_nodes | {next_node})) return paths def _path_hops(link_path: List[str], link_by_uuid: Dict[str, Dict]) -> List[Dict]: hops = [] for index, link_uuid in enumerate(link_path): optical_link = link_by_uuid[link_uuid] endpoints = optical_link.get("link_endpoint_ids", []) ingress = endpoints[0] if len(endpoints) > 0 else None egress = endpoints[-1] if len(endpoints) > 0 else None hop = {"sequence": index, "optical_link_id": link_uuid} if ingress is not None: hop["ingress_endpoint_id"] = ingress hop["device_id"] = ingress.get("device_id") if egress is not None: hop["egress_endpoint_id"] = egress hops.append(hop) return hops def _candidate_for_path( link_path: List[str], link_by_uuid: Dict[str, Dict], reservations: Iterable, required_slots: int, preferred_band: Optional[str], preferred_slots: Optional[Set[int]], include_reserved_slots: bool, ) -> Tuple[Optional[Dict], List[Dict]]: rejected = [] band_order = (preferred_band,) if preferred_band is not None else BAND_ORDER for band in band_order: path_slots = None for link_uuid in link_path: available = _available_slots(link_by_uuid[link_uuid], band, reservations, include_reserved_slots) path_slots = available if path_slots is None else path_slots.intersection(available) path_slots = path_slots or set() if preferred_slots is not None: if not preferred_slots.issubset(path_slots): rejected.append({ "code": "PREFERRED_RANGE_OCCUPIED", "message": "Preferred range is unavailable on at least one optical link", "band": band, "optical_link_ids": link_path, }) continue n_start = min(preferred_slots) n_end = max(preferred_slots) else: contiguous = _first_contiguous_range(path_slots, required_slots) if contiguous is None: rejected.append({ "code": "INSUFFICIENT_CONTIGUOUS_SPECTRUM", "message": "No contiguous spectrum block is available on every optical link in path", "band": band, "optical_link_ids": link_path, }) continue n_start, n_end = contiguous candidate_uuid = str(uuid.uuid5( uuid.NAMESPACE_URL, "{:s}|{:s}|{:d}|{:d}".format(",".join(link_path), band, n_start, n_end) )) return { "candidate_uuid": candidate_uuid, "validation_status": "VALID", "band": band, "n_start": n_start, "n_end": n_end, "required_slots": required_slots, "optical_link_ids": link_path, "path_hops": _path_hops(link_path, link_by_uuid), "path_metric": len(link_path), "available_slots_summary": { "common_available_slots": len(path_slots), "selected_range": "{:d}-{:d}".format(n_start, n_end), }, }, rejected return None, rejected def compute_optical_connectivity_candidates( request_json: Dict, devices: Iterable, optical_links: Iterable, reservations: Iterable ) -> Tuple[Dict, int]: required_slots, effective_channel_width_ghz = _required_slots(request_json) src_endpoint = request_json.get("src_endpoint_id", request_json.get("src_device_uuid")) dst_endpoint = request_json.get("dst_endpoint_id", request_json.get("dst_device_uuid")) src_device_uuid = _device_uuid(src_endpoint) dst_device_uuid = _device_uuid(dst_endpoint) if src_device_uuid == dst_device_uuid: raise OpticalCandidateError("INVALID_ENDPOINT", "Source and destination must be different") device_index = _device_index(devices) if src_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Source optical device not found", http_status=404) if dst_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Destination optical device not found", http_status=404) link_by_uuid, adjacency = _link_json_by_uuid(optical_links, device_index) if len(link_by_uuid) == 0: raise OpticalCandidateError("NO_OPTICAL_TOPOLOGY", "No optical links are available", http_status=404) max_candidates = int(request_json.get("max_candidates", 3)) max_candidates = max(1, max_candidates) paths = _shortest_link_paths(adjacency, src_device_uuid, dst_device_uuid, max_candidates) if len(paths) == 0: return { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": [], "rejected_reasons": [{"code": "NO_PATH", "message": "No optical path found"}], "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), }, 409 preferred_band = _normalize_band(request_json.get("preferred_band")) preferred_range = _preferred_slots(request_json, required_slots) include_reserved_slots = bool(request_json.get("include_reserved_slots", False)) candidates = [] rejected_reasons = [] for link_path in paths: candidate, rejected = _candidate_for_path( link_path, link_by_uuid, reservations, required_slots, preferred_band, preferred_range, include_reserved_slots ) rejected_reasons.extend(rejected) if candidate is not None: candidates.append(candidate) reply = { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": candidates, "rejected_reasons": rejected_reasons, "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), } return reply, 200 if len(candidates) > 0 else 409 def _request_summary(request_json: Dict, src_endpoint, dst_endpoint) -> Dict: return { "src_endpoint_id": src_endpoint, "dst_endpoint_id": dst_endpoint, "src_endpoint_uuid": _endpoint_uuid(src_endpoint), "dst_endpoint_uuid": _endpoint_uuid(dst_endpoint), "capacity_gbps": request_json.get("capacity_gbps", request_json.get("bitrate_gbps")), "modulation_format": request_json.get("modulation_format"), "explicit_channel_width_ghz": request_json.get( "explicit_channel_width_ghz", request_json.get("channel_width_ghz") ), "preferred_band": request_json.get("preferred_band"), "preferred_n_start": request_json.get("preferred_n_start"), "preferred_n_end": request_json.get("preferred_n_end"), } src/opticalcontroller/service/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. src/opticalcontroller/tests/test_optical_connectivity_candidates.py 0 → 100644 +158 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.proto.context_pb2 import ( ContextId, Device, LinkId, OpticalLink, OpticalSpectrumReservation, OpticalSpectrumReservationId, TopologyId, ) from opticalcontroller.service.ConnectivityCandidates import compute_optical_connectivity_candidates def _device(device_uuid: str, device_type: str = 'emu-optical-transponder') -> Device: return Device( device_id={'device_uuid': {'uuid': device_uuid}}, name=device_uuid, device_type=device_type, ) def _topology_id() -> TopologyId: return TopologyId( context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), topology_uuid={'uuid': DEFAULT_TOPOLOGY_NAME}, ) def _endpoint(device_uuid: str, endpoint_uuid: str): return { 'device_id': {'device_uuid': {'uuid': device_uuid}}, 'endpoint_uuid': {'uuid': endpoint_uuid}, 'topology_id': { 'context_id': {'context_uuid': {'uuid': DEFAULT_CONTEXT_NAME}}, 'topology_uuid': {'uuid': DEFAULT_TOPOLOGY_NAME}, }, } def _optical_link(src: str, dst: str, c_slots=None) -> OpticalLink: if c_slots is None: c_slots = {'0': 0, '1': 0, '2': 0} return OpticalLink( name='{:s}-{:s}'.format(src, dst), link_id=LinkId(link_uuid={'uuid': 'OL:{:s}=={:s}'.format(src, dst)}), link_endpoint_ids=[_endpoint(src, 'LINE'), _endpoint(dst, 'LINE')], optical_details={ 'src_port': 'LINE', 'dst_port': 'LINE', 'local_peer_port': 'LINE', 'remote_peer_port': 'LINE', 'c_slots': c_slots, }, ) def _reservation(name: str, link_uuid: str, n_start: int, n_end: int) -> OpticalSpectrumReservation: return OpticalSpectrumReservation( reservation_id=OpticalSpectrumReservationId( context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), reservation_uuid={'uuid': name}, ), topology_id=_topology_id(), optical_link_ids=[LinkId(link_uuid={'uuid': link_uuid})], band='c_slots', n_start=n_start, n_end=n_end, required_slots=n_end - n_start + 1, status=1, ) def test_compute_optical_connectivity_candidates_returns_first_feasible_range(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [], ) assert status == 200 assert reply['required_slots'] == 4 assert len(reply['candidates']) == 1 candidate = reply['candidates'][0] assert candidate['band'] == 'c_slots' assert candidate['n_start'] == 3 assert candidate['n_end'] == 6 assert candidate['optical_link_ids'] == ['OL:A==B'] def test_compute_optical_connectivity_candidates_filters_active_reservations(): link_uuid = 'OL:A==B' reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [_reservation('held', link_uuid, 3, 6)], ) assert status == 200 assert reply['candidates'][0]['n_start'] == 7 assert reply['candidates'][0]['n_end'] == 10 def test_compute_optical_connectivity_candidates_rejects_unavailable_preferred_range(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', 'preferred_n_start': 0, 'preferred_n_end': 3, }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [], ) assert status == 409 assert reply['candidates'] == [] assert reply['rejected_reasons'][0]['code'] == 'PREFERRED_RANGE_OCCUPIED' def test_compute_optical_connectivity_candidates_rejects_missing_path(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'C'}}}, 'capacity_gbps': 100, }, [_device('A'), _device('B'), _device('C')], [_optical_link('A', 'B')], [], ) assert status == 409 assert reply['candidates'] == [] assert reply['rejected_reasons'][0]['code'] == 'NO_PATH' Loading
src/opticalcontroller/OpticalController.py +43 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,14 @@ from flask import Flask, request from flask import render_template from common.DeviceTypes import DeviceTypeEnum from flask_restplus import Resource, Api from context.client.ContextClient import ContextClient from opticalcontroller.tools import * from opticalcontroller.variables import * from opticalcontroller.RSA import RSA from common.proto.context_pb2 import TopologyId , OpticalLink from opticalcontroller.service.ConnectivityCandidates import ( OpticalCandidateError, compute_optical_connectivity_candidates ) from common.proto.context_pb2 import ContextId, TopologyId , OpticalLink import json from google.protobuf.message import Message from google.protobuf.json_format import MessageToDict Loading Loading @@ -512,5 +516,43 @@ class GetTopology(Resource): #print(f'err {e}') return 'Error', 400 @optical.route('/ComputeOpticalConnectivityCandidates/<path:context_id>/<path:topology_id>', methods=['POST']) @optical.response(200, 'Success') @optical.response(400, 'Invalid request') @optical.response(404, 'Error, not found') @optical.response(409, 'No feasible optical candidate') class ComputeOpticalConnectivityCandidates(Resource): @staticmethod def post(context_id: str, topology_id: str): candidate_request = request.get_json() if candidate_request is None: return { "candidates": [], "rejected_reasons": [{"code": "INVALID_REQUEST", "message": "Missing request body"}], }, 400 topog_id = TopologyId() topog_id.topology_uuid.uuid = topology_id topog_id.context_id.context_uuid.uuid = context_id ctx_client = ContextClient() ctx_client.connect() try: topo, nodes = readTopologyDataFromContext(topog_id) reservations = ctx_client.ListOpticalSpectrumReservations( ContextId(context_uuid={"uuid": context_id}) ).reservations return compute_optical_connectivity_candidates(candidate_request, nodes, topo, reservations) except OpticalCandidateError as exc: return exc.to_reply(), exc.http_status except Exception: LOGGER.exception('Error while computing optical connectivity candidates') return { "candidates": [], "rejected_reasons": [{"code": "INTERNAL", "message": "Candidate computation failed"}], }, 500 finally: ctx_client.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=10060, debug=True)
src/opticalcontroller/service/ConnectivityCandidates.py 0 → 100644 +399 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math, uuid from collections import deque from typing import Dict, Iterable, List, Optional, Set, Tuple from common.proto.context_pb2 import OpticalSpectrumReservationStatusEnum from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.OpticalLink import correct_slot ACTIVE_RESERVATION_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, "OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED", "OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED", 1, 2, } BAND_WIDTHS = { "c_slots": 320, "l_slots": 550, "s_slots": 720, } BAND_ORDER = ("c_slots", "l_slots", "s_slots") class OpticalCandidateError(Exception): def __init__(self, code: str, message: str, http_status: int = 400) -> None: super().__init__(message) self.code = code self.message = message self.http_status = http_status def to_reply(self) -> Dict: return {"candidates": [], "rejected_reasons": [{"code": self.code, "message": self.message}]} def map_rate_to_slots(rate: int) -> int: if rate == 100: return 4 if rate == 400: return 8 if rate == 800: return 12 if rate == 1000: return 18 return 5 def _to_json(message_or_dict) -> Dict: if isinstance(message_or_dict, dict): return message_or_dict return grpc_message_to_json(message_or_dict, use_integers_for_enums=True) def _device_uuid(endpoint_or_device) -> str: if isinstance(endpoint_or_device, str): return endpoint_or_device if not isinstance(endpoint_or_device, dict): raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint must be a string or object") if "device_uuid" in endpoint_or_device: return str(endpoint_or_device["device_uuid"]) if "device_id" in endpoint_or_device: return str(endpoint_or_device["device_id"]["device_uuid"]["uuid"]) raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint does not contain device identifier") def _endpoint_uuid(endpoint_or_device) -> Optional[str]: if not isinstance(endpoint_or_device, dict): return None endpoint_uuid = endpoint_or_device.get("endpoint_uuid") if isinstance(endpoint_uuid, dict): return endpoint_uuid.get("uuid") if endpoint_uuid is not None: return str(endpoint_uuid) return None def _normalize_band(band: Optional[str]) -> Optional[str]: if band is None: return None normalized = str(band).strip().lower().replace("-", "_") if normalized in {"c", "c_band", "c_slots"}: return "c_slots" if normalized in {"l", "l_band", "l_slots"}: return "l_slots" if normalized in {"s", "s_band", "s_slots"}: return "s_slots" raise OpticalCandidateError("PREFERRED_BAND_UNAVAILABLE", "Unsupported optical band: {:s}".format(str(band))) def _required_slots(request_json: Dict) -> Tuple[int, float]: explicit_width = request_json.get("explicit_channel_width_ghz", request_json.get("channel_width_ghz")) if explicit_width is not None: width = float(explicit_width) if width <= 0: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Channel width must be positive") slots = int(math.ceil(width / 12.5)) return slots, slots * 12.5 capacity = request_json.get("capacity_gbps", request_json.get("bitrate_gbps")) if capacity is None: raise OpticalCandidateError( "UNSUPPORTED_CAPACITY_OR_MODULATION", "Request must provide capacity_gbps/bitrate_gbps or explicit_channel_width_ghz", ) slots = map_rate_to_slots(int(capacity)) return int(slots), int(slots) * 12.5 def _device_index(devices: Iterable) -> Dict[str, Dict]: index = {} for device in devices: device_json = _to_json(device) device_uuid = device_json["device_id"]["device_uuid"]["uuid"] index[device_uuid] = device_json return index def _slot_map(optical_link: Dict, band: str) -> Dict[str, int]: details = optical_link.get("optical_details", {}) raw_slots = details.get(band, {}) if len(raw_slots) == 0: return {} return correct_slot(raw_slots, width=BAND_WIDTHS[band]) def _active_reservation_ranges(reservations: Iterable, band: str, link_uuid: str) -> List[Tuple[int, int]]: ranges = [] for reservation in reservations: reservation_json = _to_json(reservation) if reservation_json.get("status") not in ACTIVE_RESERVATION_STATUSES: continue if _normalize_band(reservation_json.get("band")) != band: continue link_uuids = { link_id["link_uuid"]["uuid"] for link_id in reservation_json.get("optical_link_ids", []) if "link_uuid" in link_id and "uuid" in link_id["link_uuid"] } if link_uuid not in link_uuids: continue ranges.append((int(reservation_json.get("n_start", 0)), int(reservation_json.get("n_end", -1)))) return ranges def _available_slots( optical_link: Dict, band: str, reservations: Iterable, include_reserved_slots: bool ) -> Set[int]: slots = { int(slot) for slot, state in _slot_map(optical_link, band).items() if int(state) == 1 } if include_reserved_slots: return slots link_uuid = optical_link["link_id"]["link_uuid"]["uuid"] for n_start, n_end in _active_reservation_ranges(reservations, band, link_uuid): slots.difference_update(range(n_start, n_end + 1)) return slots def _first_contiguous_range(slots: Set[int], required_slots: int) -> Optional[Tuple[int, int]]: if required_slots <= 0: return None ordered_slots = sorted(slots) if len(ordered_slots) < required_slots: return None streak_start = ordered_slots[0] previous = ordered_slots[0] streak_len = 1 if required_slots == 1: return streak_start, streak_start for slot in ordered_slots[1:]: if slot == previous + 1: streak_len += 1 else: streak_start = slot streak_len = 1 previous = slot if streak_len >= required_slots: return streak_start, slot return None def _preferred_slots(request_json: Dict, required_slots: int) -> Optional[Set[int]]: n_start = request_json.get("preferred_n_start") n_end = request_json.get("preferred_n_end") if n_start is None and n_end is None: return None if n_start is None or n_end is None: raise OpticalCandidateError( "PREFERRED_RANGE_OCCUPIED", "Both preferred_n_start and preferred_n_end are required" ) n_start = int(n_start) n_end = int(n_end) if n_end < n_start: raise OpticalCandidateError("PREFERRED_RANGE_OCCUPIED", "preferred_n_end must be >= preferred_n_start") if (n_end - n_start + 1) < required_slots: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Preferred range is too small") return set(range(n_start, n_start + required_slots)) def _link_json_by_uuid(optical_links: Iterable, device_index: Dict[str, Dict]) -> Tuple[Dict[str, Dict], Dict[str, List]]: link_by_uuid = {} adjacency: Dict[str, List] = {} for optical_link in optical_links: link_json = _to_json(optical_link) endpoints = link_json.get("link_endpoint_ids", []) if len(endpoints) < 2: continue src_device = endpoints[0]["device_id"]["device_uuid"]["uuid"] dst_device = endpoints[-1]["device_id"]["device_uuid"]["uuid"] if src_device not in device_index or dst_device not in device_index: continue link_uuid = link_json["link_id"]["link_uuid"]["uuid"] link_by_uuid[link_uuid] = link_json adjacency.setdefault(src_device, []).append((dst_device, link_uuid)) return link_by_uuid, adjacency def _shortest_link_paths( adjacency: Dict[str, List], src_device_uuid: str, dst_device_uuid: str, max_candidates: int ) -> List[List[str]]: queue = deque([(src_device_uuid, [], {src_device_uuid})]) paths = [] while queue and len(paths) < max_candidates: node, link_path, seen_nodes = queue.popleft() if node == dst_device_uuid: paths.append(link_path) continue for next_node, link_uuid in adjacency.get(node, []): if next_node in seen_nodes: continue queue.append((next_node, link_path + [link_uuid], seen_nodes | {next_node})) return paths def _path_hops(link_path: List[str], link_by_uuid: Dict[str, Dict]) -> List[Dict]: hops = [] for index, link_uuid in enumerate(link_path): optical_link = link_by_uuid[link_uuid] endpoints = optical_link.get("link_endpoint_ids", []) ingress = endpoints[0] if len(endpoints) > 0 else None egress = endpoints[-1] if len(endpoints) > 0 else None hop = {"sequence": index, "optical_link_id": link_uuid} if ingress is not None: hop["ingress_endpoint_id"] = ingress hop["device_id"] = ingress.get("device_id") if egress is not None: hop["egress_endpoint_id"] = egress hops.append(hop) return hops def _candidate_for_path( link_path: List[str], link_by_uuid: Dict[str, Dict], reservations: Iterable, required_slots: int, preferred_band: Optional[str], preferred_slots: Optional[Set[int]], include_reserved_slots: bool, ) -> Tuple[Optional[Dict], List[Dict]]: rejected = [] band_order = (preferred_band,) if preferred_band is not None else BAND_ORDER for band in band_order: path_slots = None for link_uuid in link_path: available = _available_slots(link_by_uuid[link_uuid], band, reservations, include_reserved_slots) path_slots = available if path_slots is None else path_slots.intersection(available) path_slots = path_slots or set() if preferred_slots is not None: if not preferred_slots.issubset(path_slots): rejected.append({ "code": "PREFERRED_RANGE_OCCUPIED", "message": "Preferred range is unavailable on at least one optical link", "band": band, "optical_link_ids": link_path, }) continue n_start = min(preferred_slots) n_end = max(preferred_slots) else: contiguous = _first_contiguous_range(path_slots, required_slots) if contiguous is None: rejected.append({ "code": "INSUFFICIENT_CONTIGUOUS_SPECTRUM", "message": "No contiguous spectrum block is available on every optical link in path", "band": band, "optical_link_ids": link_path, }) continue n_start, n_end = contiguous candidate_uuid = str(uuid.uuid5( uuid.NAMESPACE_URL, "{:s}|{:s}|{:d}|{:d}".format(",".join(link_path), band, n_start, n_end) )) return { "candidate_uuid": candidate_uuid, "validation_status": "VALID", "band": band, "n_start": n_start, "n_end": n_end, "required_slots": required_slots, "optical_link_ids": link_path, "path_hops": _path_hops(link_path, link_by_uuid), "path_metric": len(link_path), "available_slots_summary": { "common_available_slots": len(path_slots), "selected_range": "{:d}-{:d}".format(n_start, n_end), }, }, rejected return None, rejected def compute_optical_connectivity_candidates( request_json: Dict, devices: Iterable, optical_links: Iterable, reservations: Iterable ) -> Tuple[Dict, int]: required_slots, effective_channel_width_ghz = _required_slots(request_json) src_endpoint = request_json.get("src_endpoint_id", request_json.get("src_device_uuid")) dst_endpoint = request_json.get("dst_endpoint_id", request_json.get("dst_device_uuid")) src_device_uuid = _device_uuid(src_endpoint) dst_device_uuid = _device_uuid(dst_endpoint) if src_device_uuid == dst_device_uuid: raise OpticalCandidateError("INVALID_ENDPOINT", "Source and destination must be different") device_index = _device_index(devices) if src_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Source optical device not found", http_status=404) if dst_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Destination optical device not found", http_status=404) link_by_uuid, adjacency = _link_json_by_uuid(optical_links, device_index) if len(link_by_uuid) == 0: raise OpticalCandidateError("NO_OPTICAL_TOPOLOGY", "No optical links are available", http_status=404) max_candidates = int(request_json.get("max_candidates", 3)) max_candidates = max(1, max_candidates) paths = _shortest_link_paths(adjacency, src_device_uuid, dst_device_uuid, max_candidates) if len(paths) == 0: return { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": [], "rejected_reasons": [{"code": "NO_PATH", "message": "No optical path found"}], "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), }, 409 preferred_band = _normalize_band(request_json.get("preferred_band")) preferred_range = _preferred_slots(request_json, required_slots) include_reserved_slots = bool(request_json.get("include_reserved_slots", False)) candidates = [] rejected_reasons = [] for link_path in paths: candidate, rejected = _candidate_for_path( link_path, link_by_uuid, reservations, required_slots, preferred_band, preferred_range, include_reserved_slots ) rejected_reasons.extend(rejected) if candidate is not None: candidates.append(candidate) reply = { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": candidates, "rejected_reasons": rejected_reasons, "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), } return reply, 200 if len(candidates) > 0 else 409 def _request_summary(request_json: Dict, src_endpoint, dst_endpoint) -> Dict: return { "src_endpoint_id": src_endpoint, "dst_endpoint_id": dst_endpoint, "src_endpoint_uuid": _endpoint_uuid(src_endpoint), "dst_endpoint_uuid": _endpoint_uuid(dst_endpoint), "capacity_gbps": request_json.get("capacity_gbps", request_json.get("bitrate_gbps")), "modulation_format": request_json.get("modulation_format"), "explicit_channel_width_ghz": request_json.get( "explicit_channel_width_ghz", request_json.get("channel_width_ghz") ), "preferred_band": request_json.get("preferred_band"), "preferred_n_start": request_json.get("preferred_n_start"), "preferred_n_end": request_json.get("preferred_n_end"), }
src/opticalcontroller/service/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
src/opticalcontroller/tests/test_optical_connectivity_candidates.py 0 → 100644 +158 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.proto.context_pb2 import ( ContextId, Device, LinkId, OpticalLink, OpticalSpectrumReservation, OpticalSpectrumReservationId, TopologyId, ) from opticalcontroller.service.ConnectivityCandidates import compute_optical_connectivity_candidates def _device(device_uuid: str, device_type: str = 'emu-optical-transponder') -> Device: return Device( device_id={'device_uuid': {'uuid': device_uuid}}, name=device_uuid, device_type=device_type, ) def _topology_id() -> TopologyId: return TopologyId( context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), topology_uuid={'uuid': DEFAULT_TOPOLOGY_NAME}, ) def _endpoint(device_uuid: str, endpoint_uuid: str): return { 'device_id': {'device_uuid': {'uuid': device_uuid}}, 'endpoint_uuid': {'uuid': endpoint_uuid}, 'topology_id': { 'context_id': {'context_uuid': {'uuid': DEFAULT_CONTEXT_NAME}}, 'topology_uuid': {'uuid': DEFAULT_TOPOLOGY_NAME}, }, } def _optical_link(src: str, dst: str, c_slots=None) -> OpticalLink: if c_slots is None: c_slots = {'0': 0, '1': 0, '2': 0} return OpticalLink( name='{:s}-{:s}'.format(src, dst), link_id=LinkId(link_uuid={'uuid': 'OL:{:s}=={:s}'.format(src, dst)}), link_endpoint_ids=[_endpoint(src, 'LINE'), _endpoint(dst, 'LINE')], optical_details={ 'src_port': 'LINE', 'dst_port': 'LINE', 'local_peer_port': 'LINE', 'remote_peer_port': 'LINE', 'c_slots': c_slots, }, ) def _reservation(name: str, link_uuid: str, n_start: int, n_end: int) -> OpticalSpectrumReservation: return OpticalSpectrumReservation( reservation_id=OpticalSpectrumReservationId( context_id=ContextId(context_uuid={'uuid': DEFAULT_CONTEXT_NAME}), reservation_uuid={'uuid': name}, ), topology_id=_topology_id(), optical_link_ids=[LinkId(link_uuid={'uuid': link_uuid})], band='c_slots', n_start=n_start, n_end=n_end, required_slots=n_end - n_start + 1, status=1, ) def test_compute_optical_connectivity_candidates_returns_first_feasible_range(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}, 'endpoint_uuid': {'uuid': 'LINE'}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [], ) assert status == 200 assert reply['required_slots'] == 4 assert len(reply['candidates']) == 1 candidate = reply['candidates'][0] assert candidate['band'] == 'c_slots' assert candidate['n_start'] == 3 assert candidate['n_end'] == 6 assert candidate['optical_link_ids'] == ['OL:A==B'] def test_compute_optical_connectivity_candidates_filters_active_reservations(): link_uuid = 'OL:A==B' reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [_reservation('held', link_uuid, 3, 6)], ) assert status == 200 assert reply['candidates'][0]['n_start'] == 7 assert reply['candidates'][0]['n_end'] == 10 def test_compute_optical_connectivity_candidates_rejects_unavailable_preferred_range(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'B'}}}, 'capacity_gbps': 100, 'preferred_band': 'c_slots', 'preferred_n_start': 0, 'preferred_n_end': 3, }, [_device('A'), _device('B')], [_optical_link('A', 'B')], [], ) assert status == 409 assert reply['candidates'] == [] assert reply['rejected_reasons'][0]['code'] == 'PREFERRED_RANGE_OCCUPIED' def test_compute_optical_connectivity_candidates_rejects_missing_path(): reply, status = compute_optical_connectivity_candidates( { 'src_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'A'}}}, 'dst_endpoint_id': {'device_id': {'device_uuid': {'uuid': 'C'}}}, 'capacity_gbps': 100, }, [_device('A'), _device('B'), _device('C')], [_optical_link('A', 'B')], [], ) assert status == 409 assert reply['candidates'] == [] assert reply['rejected_reasons'][0]['code'] == 'NO_PATH'