Loading src/nbi/service/tfs_api/Resources.py +32 −0 Original line number Diff line number Diff line Loading @@ -14,12 +14,16 @@ import json import logging import urllib.parse from typing import Dict, List import grpc import requests from flask.json import jsonify from flask_restful import Resource, request from werkzeug.exceptions import BadRequest from common.Constants import ServiceNameEnum from common.proto.context_pb2 import Empty, LinkTypeEnum, Service, ServiceTypeEnum, ConfigActionEnum, ConfigRule from common.Settings import get_service_host, get_service_port_grpc from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest from common.tools.descriptor.Tools import format_device_custom_config_rules, format_service_custom_config_rules from common.tools.grpc.Tools import grpc_message_to_json Loading Loading @@ -455,6 +459,34 @@ class OpticalSpectrumReservationRelease(_Resource): except grpc.RpcError as exc: return _format_grpc_error(exc) class OpticalConnectivityCandidates(_Resource): def post(self, context_uuid : str, topology_uuid : str): candidate_request = request.get_json() if candidate_request is None: raise BadRequest('Missing optical connectivity candidate request') try: optical_host = get_service_host(ServiceNameEnum.OPTICALCONTROLLER) optical_port = get_service_port_grpc(ServiceNameEnum.OPTICALCONTROLLER) url = 'http://{:s}:{:d}/OpticalTFS/ComputeOpticalConnectivityCandidates/{:s}/{:s}'.format( optical_host, optical_port, urllib.parse.quote(context_uuid, safe=''), urllib.parse.quote(topology_uuid, safe=''), ) reply = requests.post(url, json=candidate_request, timeout=30) if reply.content and len(reply.content) > 0: return reply.json(), reply.status_code return None, reply.status_code except requests.exceptions.RequestException as exc: LOGGER.warning('Optical controller candidate computation request failed: %s', str(exc)) return { 'candidates': [], 'rejected_reasons': [{ 'code': 'OPTICAL_CONTROLLER_UNAVAILABLE', 'message': str(exc), }], }, 503 class ConnectionIds(_Resource): def get(self, context_uuid : str, service_uuid : str): return format_grpc_to_json(self.context_client.ListConnectionIds(grpc_service_id(context_uuid, service_uuid))) Loading src/nbi/service/tfs_api/__init__.py +3 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ from .Resources import ( OpticalLink, OpticalLinks, OpticalSpectrumReservation, OpticalSpectrumReservationConsume, OpticalSpectrumReservationRelease, OpticalSpectrumReservations, OpticalConnectivityCandidates, PolicyRule, PolicyRuleIds, PolicyRules, Service, ServiceIds, Services, Slice, SliceIds, Slices, Loading Loading @@ -72,6 +73,8 @@ _RESOURCES = [ '/context/<path:context_uuid>/optical_spectrum_reservation/<path:reservation_uuid>/consume'), ('api.optical_spectrum_reservation_release', OpticalSpectrumReservationRelease, '/context/<path:context_uuid>/optical_spectrum_reservation/<path:reservation_uuid>/release'), ('api.optical_connectivity_candidates', OpticalConnectivityCandidates, '/context/<path:context_uuid>/topology/<path:topology_uuid>/optical_connectivity_candidates'), ('api.connection_ids', ConnectionIds, '/context/<path:context_uuid>/service/<path:service_uuid>/connection_ids'), ('api.connections', Connections, '/context/<path:context_uuid>/service/<path:service_uuid>/connections'), Loading src/opticalcontroller/OpticalController.py +43 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,14 @@ from flask import Flask, request from flask import render_template from common.DeviceTypes import DeviceTypeEnum from flask_restplus import Resource, Api from context.client.ContextClient import ContextClient from opticalcontroller.tools import * from opticalcontroller.variables import * from opticalcontroller.RSA import RSA from common.proto.context_pb2 import TopologyId , OpticalLink from opticalcontroller.service.ConnectivityCandidates import ( OpticalCandidateError, compute_optical_connectivity_candidates ) from common.proto.context_pb2 import ContextId, TopologyId , OpticalLink import json from google.protobuf.message import Message from google.protobuf.json_format import MessageToDict Loading Loading @@ -512,5 +516,43 @@ class GetTopology(Resource): #print(f'err {e}') return 'Error', 400 @optical.route('/ComputeOpticalConnectivityCandidates/<path:context_id>/<path:topology_id>', methods=['POST']) @optical.response(200, 'Success') @optical.response(400, 'Invalid request') @optical.response(404, 'Error, not found') @optical.response(409, 'No feasible optical candidate') class ComputeOpticalConnectivityCandidates(Resource): @staticmethod def post(context_id: str, topology_id: str): candidate_request = request.get_json() if candidate_request is None: return { "candidates": [], "rejected_reasons": [{"code": "INVALID_REQUEST", "message": "Missing request body"}], }, 400 topog_id = TopologyId() topog_id.topology_uuid.uuid = topology_id topog_id.context_id.context_uuid.uuid = context_id ctx_client = ContextClient() ctx_client.connect() try: topo, nodes = readTopologyDataFromContext(topog_id) reservations = ctx_client.ListOpticalSpectrumReservations( ContextId(context_uuid={"uuid": context_id}) ).reservations return compute_optical_connectivity_candidates(candidate_request, nodes, topo, reservations) except OpticalCandidateError as exc: return exc.to_reply(), exc.http_status except Exception: LOGGER.exception('Error while computing optical connectivity candidates') return { "candidates": [], "rejected_reasons": [{"code": "INTERNAL", "message": "Candidate computation failed"}], }, 500 finally: ctx_client.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=10060, debug=True) src/opticalcontroller/service/ConnectivityCandidates.py 0 → 100644 +416 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math, uuid from collections import deque from typing import Dict, Iterable, List, Optional, Set, Tuple from common.proto.context_pb2 import OpticalSpectrumReservationStatusEnum from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.OpticalLink import correct_slot ACTIVE_RESERVATION_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, "OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED", "OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED", 1, 2, } BAND_WIDTHS = { "c_slots": 320, "l_slots": 550, "s_slots": 720, } BAND_ORDER = ("c_slots", "l_slots", "s_slots") class OpticalCandidateError(Exception): def __init__(self, code: str, message: str, http_status: int = 400) -> None: super().__init__(message) self.code = code self.message = message self.http_status = http_status def to_reply(self) -> Dict: return {"candidates": [], "rejected_reasons": [{"code": self.code, "message": self.message}]} def map_rate_to_slots(rate: int) -> int: if rate == 100: return 4 if rate == 400: return 8 if rate == 800: return 12 if rate == 1000: return 18 return 5 def _to_json(message_or_dict) -> Dict: if isinstance(message_or_dict, dict): return message_or_dict return grpc_message_to_json(message_or_dict, use_integers_for_enums=True) def _device_uuid(endpoint_or_device) -> str: if isinstance(endpoint_or_device, str): return endpoint_or_device if not isinstance(endpoint_or_device, dict): raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint must be a string or object") if "device_uuid" in endpoint_or_device: return str(endpoint_or_device["device_uuid"]) if "device_id" in endpoint_or_device: return str(endpoint_or_device["device_id"]["device_uuid"]["uuid"]) raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint does not contain device identifier") def _endpoint_uuid(endpoint_or_device) -> Optional[str]: if not isinstance(endpoint_or_device, dict): return None endpoint_uuid = endpoint_or_device.get("endpoint_uuid") if isinstance(endpoint_uuid, dict): return endpoint_uuid.get("uuid") if endpoint_uuid is not None: return str(endpoint_uuid) return None def _normalize_band(band: Optional[str]) -> Optional[str]: if band is None: return None normalized = str(band).strip().lower().replace("-", "_") if normalized in {"c", "c_band", "c_slots"}: return "c_slots" if normalized in {"l", "l_band", "l_slots"}: return "l_slots" if normalized in {"s", "s_band", "s_slots"}: return "s_slots" raise OpticalCandidateError("PREFERRED_BAND_UNAVAILABLE", "Unsupported optical band: {:s}".format(str(band))) def _required_slots(request_json: Dict) -> Tuple[int, float]: explicit_width = request_json.get("explicit_channel_width_ghz", request_json.get("channel_width_ghz")) if explicit_width is not None: width = float(explicit_width) if width <= 0: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Channel width must be positive") slots = int(math.ceil(width / 12.5)) return slots, slots * 12.5 capacity = request_json.get("capacity_gbps", request_json.get("bitrate_gbps")) if capacity is None: raise OpticalCandidateError( "UNSUPPORTED_CAPACITY_OR_MODULATION", "Request must provide capacity_gbps/bitrate_gbps or explicit_channel_width_ghz", ) slots = map_rate_to_slots(int(capacity)) return int(slots), int(slots) * 12.5 def _device_index(devices: Iterable) -> Dict[str, Dict]: index = {} for device in devices: device_json = _to_json(device) device_uuid = device_json["device_id"]["device_uuid"]["uuid"] index[device_uuid] = device_json return index def _slot_map(optical_link: Dict, band: str) -> Dict[str, int]: details = optical_link.get("optical_details", {}) raw_slots = details.get(band, {}) if len(raw_slots) == 0: return {} return correct_slot(raw_slots, width=BAND_WIDTHS[band]) def _active_reservation_ranges(reservations: Iterable, band: str, link_uuid: str) -> List[Tuple[int, int]]: ranges = [] for reservation in reservations: reservation_json = _to_json(reservation) if reservation_json.get("status") not in ACTIVE_RESERVATION_STATUSES: continue if _normalize_band(reservation_json.get("band")) != band: continue link_uuids = { link_id["link_uuid"]["uuid"] for link_id in reservation_json.get("optical_link_ids", []) if "link_uuid" in link_id and "uuid" in link_id["link_uuid"] } if link_uuid not in link_uuids: continue ranges.append((int(reservation_json.get("n_start", 0)), int(reservation_json.get("n_end", -1)))) return ranges def _available_slots( optical_link: Dict, band: str, reservations: Iterable, include_reserved_slots: bool ) -> Set[int]: slots = { int(slot) for slot, state in _slot_map(optical_link, band).items() if int(state) == 1 } if include_reserved_slots: return slots link_uuid = optical_link["link_id"]["link_uuid"]["uuid"] for n_start, n_end in _active_reservation_ranges(reservations, band, link_uuid): slots.difference_update(range(n_start, n_end + 1)) return slots def _first_contiguous_range(slots: Set[int], required_slots: int) -> Optional[Tuple[int, int]]: ranges = _contiguous_ranges(slots, required_slots) if len(ranges) == 0: return None return ranges[0] def _contiguous_ranges(slots: Set[int], required_slots: int) -> List[Tuple[int, int]]: if required_slots <= 0: return [] ordered_slots = sorted(slots) if len(ordered_slots) < required_slots: return [] ranges = [] streak_start = ordered_slots[0] previous = ordered_slots[0] for slot in ordered_slots[1:]: if slot == previous + 1: previous = slot else: if previous - streak_start + 1 >= required_slots: ranges.append((streak_start, previous)) streak_start = slot previous = slot if previous - streak_start + 1 >= required_slots: ranges.append((streak_start, previous)) return ranges def _range_dicts(ranges: Iterable[Tuple[int, int]]) -> List[Dict[str, int]]: return [{"n_start": n_start, "n_end": n_end} for n_start, n_end in ranges] def _preferred_slots(request_json: Dict, required_slots: int) -> Optional[Set[int]]: n_start = request_json.get("preferred_n_start") n_end = request_json.get("preferred_n_end") if n_start is None and n_end is None: return None if n_start is None or n_end is None: raise OpticalCandidateError( "PREFERRED_RANGE_OCCUPIED", "Both preferred_n_start and preferred_n_end are required" ) n_start = int(n_start) n_end = int(n_end) if n_end < n_start: raise OpticalCandidateError("PREFERRED_RANGE_OCCUPIED", "preferred_n_end must be >= preferred_n_start") if (n_end - n_start + 1) < required_slots: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Preferred range is too small") return set(range(n_start, n_start + required_slots)) def _link_json_by_uuid(optical_links: Iterable, device_index: Dict[str, Dict]) -> Tuple[Dict[str, Dict], Dict[str, List]]: link_by_uuid = {} adjacency: Dict[str, List] = {} for optical_link in optical_links: link_json = _to_json(optical_link) endpoints = link_json.get("link_endpoint_ids", []) if len(endpoints) < 2: continue src_device = endpoints[0]["device_id"]["device_uuid"]["uuid"] dst_device = endpoints[-1]["device_id"]["device_uuid"]["uuid"] if src_device not in device_index or dst_device not in device_index: continue link_uuid = link_json["link_id"]["link_uuid"]["uuid"] link_by_uuid[link_uuid] = link_json adjacency.setdefault(src_device, []).append((dst_device, link_uuid)) return link_by_uuid, adjacency def _shortest_link_paths( adjacency: Dict[str, List], src_device_uuid: str, dst_device_uuid: str, max_candidates: int ) -> List[List[str]]: queue = deque([(src_device_uuid, [], {src_device_uuid})]) paths = [] while queue and len(paths) < max_candidates: node, link_path, seen_nodes = queue.popleft() if node == dst_device_uuid: paths.append(link_path) continue for next_node, link_uuid in adjacency.get(node, []): if next_node in seen_nodes: continue queue.append((next_node, link_path + [link_uuid], seen_nodes | {next_node})) return paths def _path_hops(link_path: List[str], link_by_uuid: Dict[str, Dict]) -> List[Dict]: hops = [] for index, link_uuid in enumerate(link_path): optical_link = link_by_uuid[link_uuid] endpoints = optical_link.get("link_endpoint_ids", []) ingress = endpoints[0] if len(endpoints) > 0 else None egress = endpoints[-1] if len(endpoints) > 0 else None hop = {"sequence": index, "optical_link_id": link_uuid} if ingress is not None: hop["ingress_endpoint_id"] = ingress hop["device_id"] = ingress.get("device_id") if egress is not None: hop["egress_endpoint_id"] = egress hops.append(hop) return hops def _candidate_for_path( link_path: List[str], link_by_uuid: Dict[str, Dict], reservations: Iterable, required_slots: int, preferred_band: Optional[str], preferred_slots: Optional[Set[int]], include_reserved_slots: bool, modulation_format: Optional[str], ) -> Tuple[Optional[Dict], List[Dict]]: rejected = [] band_order = (preferred_band,) if preferred_band is not None else BAND_ORDER for band in band_order: path_slots = None for link_uuid in link_path: available = _available_slots(link_by_uuid[link_uuid], band, reservations, include_reserved_slots) path_slots = available if path_slots is None else path_slots.intersection(available) path_slots = path_slots or set() if preferred_slots is not None: if not preferred_slots.issubset(path_slots): rejected.append({ "code": "PREFERRED_RANGE_OCCUPIED", "message": "Preferred range is unavailable on at least one optical link", "band": band, "optical_link_ids": link_path, }) continue n_start = min(preferred_slots) n_end = max(preferred_slots) available_ranges = [(n_start, n_end)] else: available_ranges = _contiguous_ranges(path_slots, required_slots) if len(available_ranges) == 0: rejected.append({ "code": "INSUFFICIENT_CONTIGUOUS_SPECTRUM", "message": "No contiguous spectrum block is available on every optical link in path", "band": band, "optical_link_ids": link_path, }) continue n_start = available_ranges[0][0] n_end = n_start + required_slots - 1 candidate_uuid = str(uuid.uuid5( uuid.NAMESPACE_URL, "{:s}|{:s}|{:d}|{:d}".format(",".join(link_path), band, n_start, n_end) )) return { "candidate_uuid": candidate_uuid, "validation_status": "VALID", "band": band, "n_start": n_start, "n_end": n_end, "required_slots": required_slots, "modulation_format": modulation_format, "available_slot_ranges": _range_dicts(available_ranges), "optical_link_ids": link_path, "path_hops": _path_hops(link_path, link_by_uuid), "path_metric": len(link_path), "available_slots_summary": { "common_available_slots": len(path_slots), "selected_range": "{:d}-{:d}".format(n_start, n_end), "available_range_count": len(available_ranges), }, }, rejected return None, rejected def compute_optical_connectivity_candidates( request_json: Dict, devices: Iterable, optical_links: Iterable, reservations: Iterable ) -> Tuple[Dict, int]: required_slots, effective_channel_width_ghz = _required_slots(request_json) src_endpoint = request_json.get("src_endpoint_id", request_json.get("src_device_uuid")) dst_endpoint = request_json.get("dst_endpoint_id", request_json.get("dst_device_uuid")) src_device_uuid = _device_uuid(src_endpoint) dst_device_uuid = _device_uuid(dst_endpoint) if src_device_uuid == dst_device_uuid: raise OpticalCandidateError("INVALID_ENDPOINT", "Source and destination must be different") device_index = _device_index(devices) if src_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Source optical device not found", http_status=404) if dst_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Destination optical device not found", http_status=404) link_by_uuid, adjacency = _link_json_by_uuid(optical_links, device_index) if len(link_by_uuid) == 0: raise OpticalCandidateError("NO_OPTICAL_TOPOLOGY", "No optical links are available", http_status=404) max_candidates = int(request_json.get("max_candidates", 3)) max_candidates = max(1, max_candidates) paths = _shortest_link_paths(adjacency, src_device_uuid, dst_device_uuid, max_candidates) if len(paths) == 0: return { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": [], "rejected_reasons": [{"code": "NO_PATH", "message": "No optical path found"}], "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), }, 409 preferred_band = _normalize_band(request_json.get("preferred_band")) preferred_range = _preferred_slots(request_json, required_slots) include_reserved_slots = bool(request_json.get("include_reserved_slots", False)) modulation_format = request_json.get("modulation_format") candidates = [] rejected_reasons = [] for link_path in paths: candidate, rejected = _candidate_for_path( link_path, link_by_uuid, reservations, required_slots, preferred_band, preferred_range, include_reserved_slots, modulation_format ) rejected_reasons.extend(rejected) if candidate is not None: candidates.append(candidate) reply = { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": candidates, "rejected_reasons": rejected_reasons, "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), } return reply, 200 if len(candidates) > 0 else 409 def _request_summary(request_json: Dict, src_endpoint, dst_endpoint) -> Dict: return { "src_endpoint_id": src_endpoint, "dst_endpoint_id": dst_endpoint, "src_endpoint_uuid": _endpoint_uuid(src_endpoint), "dst_endpoint_uuid": _endpoint_uuid(dst_endpoint), "capacity_gbps": request_json.get("capacity_gbps", request_json.get("bitrate_gbps")), "modulation_format": request_json.get("modulation_format"), "explicit_channel_width_ghz": request_json.get( "explicit_channel_width_ghz", request_json.get("channel_width_ghz") ), "requested_modulation_format": request_json.get("modulation_format"), "preferred_band": request_json.get("preferred_band"), "preferred_n_start": request_json.get("preferred_n_start"), "preferred_n_end": request_json.get("preferred_n_end"), } src/opticalcontroller/service/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. Loading
src/nbi/service/tfs_api/Resources.py +32 −0 Original line number Diff line number Diff line Loading @@ -14,12 +14,16 @@ import json import logging import urllib.parse from typing import Dict, List import grpc import requests from flask.json import jsonify from flask_restful import Resource, request from werkzeug.exceptions import BadRequest from common.Constants import ServiceNameEnum from common.proto.context_pb2 import Empty, LinkTypeEnum, Service, ServiceTypeEnum, ConfigActionEnum, ConfigRule from common.Settings import get_service_host, get_service_port_grpc from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest from common.tools.descriptor.Tools import format_device_custom_config_rules, format_service_custom_config_rules from common.tools.grpc.Tools import grpc_message_to_json Loading Loading @@ -455,6 +459,34 @@ class OpticalSpectrumReservationRelease(_Resource): except grpc.RpcError as exc: return _format_grpc_error(exc) class OpticalConnectivityCandidates(_Resource): def post(self, context_uuid : str, topology_uuid : str): candidate_request = request.get_json() if candidate_request is None: raise BadRequest('Missing optical connectivity candidate request') try: optical_host = get_service_host(ServiceNameEnum.OPTICALCONTROLLER) optical_port = get_service_port_grpc(ServiceNameEnum.OPTICALCONTROLLER) url = 'http://{:s}:{:d}/OpticalTFS/ComputeOpticalConnectivityCandidates/{:s}/{:s}'.format( optical_host, optical_port, urllib.parse.quote(context_uuid, safe=''), urllib.parse.quote(topology_uuid, safe=''), ) reply = requests.post(url, json=candidate_request, timeout=30) if reply.content and len(reply.content) > 0: return reply.json(), reply.status_code return None, reply.status_code except requests.exceptions.RequestException as exc: LOGGER.warning('Optical controller candidate computation request failed: %s', str(exc)) return { 'candidates': [], 'rejected_reasons': [{ 'code': 'OPTICAL_CONTROLLER_UNAVAILABLE', 'message': str(exc), }], }, 503 class ConnectionIds(_Resource): def get(self, context_uuid : str, service_uuid : str): return format_grpc_to_json(self.context_client.ListConnectionIds(grpc_service_id(context_uuid, service_uuid))) Loading
src/nbi/service/tfs_api/__init__.py +3 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ from .Resources import ( OpticalLink, OpticalLinks, OpticalSpectrumReservation, OpticalSpectrumReservationConsume, OpticalSpectrumReservationRelease, OpticalSpectrumReservations, OpticalConnectivityCandidates, PolicyRule, PolicyRuleIds, PolicyRules, Service, ServiceIds, Services, Slice, SliceIds, Slices, Loading Loading @@ -72,6 +73,8 @@ _RESOURCES = [ '/context/<path:context_uuid>/optical_spectrum_reservation/<path:reservation_uuid>/consume'), ('api.optical_spectrum_reservation_release', OpticalSpectrumReservationRelease, '/context/<path:context_uuid>/optical_spectrum_reservation/<path:reservation_uuid>/release'), ('api.optical_connectivity_candidates', OpticalConnectivityCandidates, '/context/<path:context_uuid>/topology/<path:topology_uuid>/optical_connectivity_candidates'), ('api.connection_ids', ConnectionIds, '/context/<path:context_uuid>/service/<path:service_uuid>/connection_ids'), ('api.connections', Connections, '/context/<path:context_uuid>/service/<path:service_uuid>/connections'), Loading
src/opticalcontroller/OpticalController.py +43 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,14 @@ from flask import Flask, request from flask import render_template from common.DeviceTypes import DeviceTypeEnum from flask_restplus import Resource, Api from context.client.ContextClient import ContextClient from opticalcontroller.tools import * from opticalcontroller.variables import * from opticalcontroller.RSA import RSA from common.proto.context_pb2 import TopologyId , OpticalLink from opticalcontroller.service.ConnectivityCandidates import ( OpticalCandidateError, compute_optical_connectivity_candidates ) from common.proto.context_pb2 import ContextId, TopologyId , OpticalLink import json from google.protobuf.message import Message from google.protobuf.json_format import MessageToDict Loading Loading @@ -512,5 +516,43 @@ class GetTopology(Resource): #print(f'err {e}') return 'Error', 400 @optical.route('/ComputeOpticalConnectivityCandidates/<path:context_id>/<path:topology_id>', methods=['POST']) @optical.response(200, 'Success') @optical.response(400, 'Invalid request') @optical.response(404, 'Error, not found') @optical.response(409, 'No feasible optical candidate') class ComputeOpticalConnectivityCandidates(Resource): @staticmethod def post(context_id: str, topology_id: str): candidate_request = request.get_json() if candidate_request is None: return { "candidates": [], "rejected_reasons": [{"code": "INVALID_REQUEST", "message": "Missing request body"}], }, 400 topog_id = TopologyId() topog_id.topology_uuid.uuid = topology_id topog_id.context_id.context_uuid.uuid = context_id ctx_client = ContextClient() ctx_client.connect() try: topo, nodes = readTopologyDataFromContext(topog_id) reservations = ctx_client.ListOpticalSpectrumReservations( ContextId(context_uuid={"uuid": context_id}) ).reservations return compute_optical_connectivity_candidates(candidate_request, nodes, topo, reservations) except OpticalCandidateError as exc: return exc.to_reply(), exc.http_status except Exception: LOGGER.exception('Error while computing optical connectivity candidates') return { "candidates": [], "rejected_reasons": [{"code": "INTERNAL", "message": "Candidate computation failed"}], }, 500 finally: ctx_client.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=10060, debug=True)
src/opticalcontroller/service/ConnectivityCandidates.py 0 → 100644 +416 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math, uuid from collections import deque from typing import Dict, Iterable, List, Optional, Set, Tuple from common.proto.context_pb2 import OpticalSpectrumReservationStatusEnum from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.OpticalLink import correct_slot ACTIVE_RESERVATION_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, "OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED", "OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED", 1, 2, } BAND_WIDTHS = { "c_slots": 320, "l_slots": 550, "s_slots": 720, } BAND_ORDER = ("c_slots", "l_slots", "s_slots") class OpticalCandidateError(Exception): def __init__(self, code: str, message: str, http_status: int = 400) -> None: super().__init__(message) self.code = code self.message = message self.http_status = http_status def to_reply(self) -> Dict: return {"candidates": [], "rejected_reasons": [{"code": self.code, "message": self.message}]} def map_rate_to_slots(rate: int) -> int: if rate == 100: return 4 if rate == 400: return 8 if rate == 800: return 12 if rate == 1000: return 18 return 5 def _to_json(message_or_dict) -> Dict: if isinstance(message_or_dict, dict): return message_or_dict return grpc_message_to_json(message_or_dict, use_integers_for_enums=True) def _device_uuid(endpoint_or_device) -> str: if isinstance(endpoint_or_device, str): return endpoint_or_device if not isinstance(endpoint_or_device, dict): raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint must be a string or object") if "device_uuid" in endpoint_or_device: return str(endpoint_or_device["device_uuid"]) if "device_id" in endpoint_or_device: return str(endpoint_or_device["device_id"]["device_uuid"]["uuid"]) raise OpticalCandidateError("INVALID_ENDPOINT", "Endpoint does not contain device identifier") def _endpoint_uuid(endpoint_or_device) -> Optional[str]: if not isinstance(endpoint_or_device, dict): return None endpoint_uuid = endpoint_or_device.get("endpoint_uuid") if isinstance(endpoint_uuid, dict): return endpoint_uuid.get("uuid") if endpoint_uuid is not None: return str(endpoint_uuid) return None def _normalize_band(band: Optional[str]) -> Optional[str]: if band is None: return None normalized = str(band).strip().lower().replace("-", "_") if normalized in {"c", "c_band", "c_slots"}: return "c_slots" if normalized in {"l", "l_band", "l_slots"}: return "l_slots" if normalized in {"s", "s_band", "s_slots"}: return "s_slots" raise OpticalCandidateError("PREFERRED_BAND_UNAVAILABLE", "Unsupported optical band: {:s}".format(str(band))) def _required_slots(request_json: Dict) -> Tuple[int, float]: explicit_width = request_json.get("explicit_channel_width_ghz", request_json.get("channel_width_ghz")) if explicit_width is not None: width = float(explicit_width) if width <= 0: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Channel width must be positive") slots = int(math.ceil(width / 12.5)) return slots, slots * 12.5 capacity = request_json.get("capacity_gbps", request_json.get("bitrate_gbps")) if capacity is None: raise OpticalCandidateError( "UNSUPPORTED_CAPACITY_OR_MODULATION", "Request must provide capacity_gbps/bitrate_gbps or explicit_channel_width_ghz", ) slots = map_rate_to_slots(int(capacity)) return int(slots), int(slots) * 12.5 def _device_index(devices: Iterable) -> Dict[str, Dict]: index = {} for device in devices: device_json = _to_json(device) device_uuid = device_json["device_id"]["device_uuid"]["uuid"] index[device_uuid] = device_json return index def _slot_map(optical_link: Dict, band: str) -> Dict[str, int]: details = optical_link.get("optical_details", {}) raw_slots = details.get(band, {}) if len(raw_slots) == 0: return {} return correct_slot(raw_slots, width=BAND_WIDTHS[band]) def _active_reservation_ranges(reservations: Iterable, band: str, link_uuid: str) -> List[Tuple[int, int]]: ranges = [] for reservation in reservations: reservation_json = _to_json(reservation) if reservation_json.get("status") not in ACTIVE_RESERVATION_STATUSES: continue if _normalize_band(reservation_json.get("band")) != band: continue link_uuids = { link_id["link_uuid"]["uuid"] for link_id in reservation_json.get("optical_link_ids", []) if "link_uuid" in link_id and "uuid" in link_id["link_uuid"] } if link_uuid not in link_uuids: continue ranges.append((int(reservation_json.get("n_start", 0)), int(reservation_json.get("n_end", -1)))) return ranges def _available_slots( optical_link: Dict, band: str, reservations: Iterable, include_reserved_slots: bool ) -> Set[int]: slots = { int(slot) for slot, state in _slot_map(optical_link, band).items() if int(state) == 1 } if include_reserved_slots: return slots link_uuid = optical_link["link_id"]["link_uuid"]["uuid"] for n_start, n_end in _active_reservation_ranges(reservations, band, link_uuid): slots.difference_update(range(n_start, n_end + 1)) return slots def _first_contiguous_range(slots: Set[int], required_slots: int) -> Optional[Tuple[int, int]]: ranges = _contiguous_ranges(slots, required_slots) if len(ranges) == 0: return None return ranges[0] def _contiguous_ranges(slots: Set[int], required_slots: int) -> List[Tuple[int, int]]: if required_slots <= 0: return [] ordered_slots = sorted(slots) if len(ordered_slots) < required_slots: return [] ranges = [] streak_start = ordered_slots[0] previous = ordered_slots[0] for slot in ordered_slots[1:]: if slot == previous + 1: previous = slot else: if previous - streak_start + 1 >= required_slots: ranges.append((streak_start, previous)) streak_start = slot previous = slot if previous - streak_start + 1 >= required_slots: ranges.append((streak_start, previous)) return ranges def _range_dicts(ranges: Iterable[Tuple[int, int]]) -> List[Dict[str, int]]: return [{"n_start": n_start, "n_end": n_end} for n_start, n_end in ranges] def _preferred_slots(request_json: Dict, required_slots: int) -> Optional[Set[int]]: n_start = request_json.get("preferred_n_start") n_end = request_json.get("preferred_n_end") if n_start is None and n_end is None: return None if n_start is None or n_end is None: raise OpticalCandidateError( "PREFERRED_RANGE_OCCUPIED", "Both preferred_n_start and preferred_n_end are required" ) n_start = int(n_start) n_end = int(n_end) if n_end < n_start: raise OpticalCandidateError("PREFERRED_RANGE_OCCUPIED", "preferred_n_end must be >= preferred_n_start") if (n_end - n_start + 1) < required_slots: raise OpticalCandidateError("UNSUPPORTED_CAPACITY_OR_MODULATION", "Preferred range is too small") return set(range(n_start, n_start + required_slots)) def _link_json_by_uuid(optical_links: Iterable, device_index: Dict[str, Dict]) -> Tuple[Dict[str, Dict], Dict[str, List]]: link_by_uuid = {} adjacency: Dict[str, List] = {} for optical_link in optical_links: link_json = _to_json(optical_link) endpoints = link_json.get("link_endpoint_ids", []) if len(endpoints) < 2: continue src_device = endpoints[0]["device_id"]["device_uuid"]["uuid"] dst_device = endpoints[-1]["device_id"]["device_uuid"]["uuid"] if src_device not in device_index or dst_device not in device_index: continue link_uuid = link_json["link_id"]["link_uuid"]["uuid"] link_by_uuid[link_uuid] = link_json adjacency.setdefault(src_device, []).append((dst_device, link_uuid)) return link_by_uuid, adjacency def _shortest_link_paths( adjacency: Dict[str, List], src_device_uuid: str, dst_device_uuid: str, max_candidates: int ) -> List[List[str]]: queue = deque([(src_device_uuid, [], {src_device_uuid})]) paths = [] while queue and len(paths) < max_candidates: node, link_path, seen_nodes = queue.popleft() if node == dst_device_uuid: paths.append(link_path) continue for next_node, link_uuid in adjacency.get(node, []): if next_node in seen_nodes: continue queue.append((next_node, link_path + [link_uuid], seen_nodes | {next_node})) return paths def _path_hops(link_path: List[str], link_by_uuid: Dict[str, Dict]) -> List[Dict]: hops = [] for index, link_uuid in enumerate(link_path): optical_link = link_by_uuid[link_uuid] endpoints = optical_link.get("link_endpoint_ids", []) ingress = endpoints[0] if len(endpoints) > 0 else None egress = endpoints[-1] if len(endpoints) > 0 else None hop = {"sequence": index, "optical_link_id": link_uuid} if ingress is not None: hop["ingress_endpoint_id"] = ingress hop["device_id"] = ingress.get("device_id") if egress is not None: hop["egress_endpoint_id"] = egress hops.append(hop) return hops def _candidate_for_path( link_path: List[str], link_by_uuid: Dict[str, Dict], reservations: Iterable, required_slots: int, preferred_band: Optional[str], preferred_slots: Optional[Set[int]], include_reserved_slots: bool, modulation_format: Optional[str], ) -> Tuple[Optional[Dict], List[Dict]]: rejected = [] band_order = (preferred_band,) if preferred_band is not None else BAND_ORDER for band in band_order: path_slots = None for link_uuid in link_path: available = _available_slots(link_by_uuid[link_uuid], band, reservations, include_reserved_slots) path_slots = available if path_slots is None else path_slots.intersection(available) path_slots = path_slots or set() if preferred_slots is not None: if not preferred_slots.issubset(path_slots): rejected.append({ "code": "PREFERRED_RANGE_OCCUPIED", "message": "Preferred range is unavailable on at least one optical link", "band": band, "optical_link_ids": link_path, }) continue n_start = min(preferred_slots) n_end = max(preferred_slots) available_ranges = [(n_start, n_end)] else: available_ranges = _contiguous_ranges(path_slots, required_slots) if len(available_ranges) == 0: rejected.append({ "code": "INSUFFICIENT_CONTIGUOUS_SPECTRUM", "message": "No contiguous spectrum block is available on every optical link in path", "band": band, "optical_link_ids": link_path, }) continue n_start = available_ranges[0][0] n_end = n_start + required_slots - 1 candidate_uuid = str(uuid.uuid5( uuid.NAMESPACE_URL, "{:s}|{:s}|{:d}|{:d}".format(",".join(link_path), band, n_start, n_end) )) return { "candidate_uuid": candidate_uuid, "validation_status": "VALID", "band": band, "n_start": n_start, "n_end": n_end, "required_slots": required_slots, "modulation_format": modulation_format, "available_slot_ranges": _range_dicts(available_ranges), "optical_link_ids": link_path, "path_hops": _path_hops(link_path, link_by_uuid), "path_metric": len(link_path), "available_slots_summary": { "common_available_slots": len(path_slots), "selected_range": "{:d}-{:d}".format(n_start, n_end), "available_range_count": len(available_ranges), }, }, rejected return None, rejected def compute_optical_connectivity_candidates( request_json: Dict, devices: Iterable, optical_links: Iterable, reservations: Iterable ) -> Tuple[Dict, int]: required_slots, effective_channel_width_ghz = _required_slots(request_json) src_endpoint = request_json.get("src_endpoint_id", request_json.get("src_device_uuid")) dst_endpoint = request_json.get("dst_endpoint_id", request_json.get("dst_device_uuid")) src_device_uuid = _device_uuid(src_endpoint) dst_device_uuid = _device_uuid(dst_endpoint) if src_device_uuid == dst_device_uuid: raise OpticalCandidateError("INVALID_ENDPOINT", "Source and destination must be different") device_index = _device_index(devices) if src_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Source optical device not found", http_status=404) if dst_device_uuid not in device_index: raise OpticalCandidateError("INVALID_ENDPOINT", "Destination optical device not found", http_status=404) link_by_uuid, adjacency = _link_json_by_uuid(optical_links, device_index) if len(link_by_uuid) == 0: raise OpticalCandidateError("NO_OPTICAL_TOPOLOGY", "No optical links are available", http_status=404) max_candidates = int(request_json.get("max_candidates", 3)) max_candidates = max(1, max_candidates) paths = _shortest_link_paths(adjacency, src_device_uuid, dst_device_uuid, max_candidates) if len(paths) == 0: return { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": [], "rejected_reasons": [{"code": "NO_PATH", "message": "No optical path found"}], "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), }, 409 preferred_band = _normalize_band(request_json.get("preferred_band")) preferred_range = _preferred_slots(request_json, required_slots) include_reserved_slots = bool(request_json.get("include_reserved_slots", False)) modulation_format = request_json.get("modulation_format") candidates = [] rejected_reasons = [] for link_path in paths: candidate, rejected = _candidate_for_path( link_path, link_by_uuid, reservations, required_slots, preferred_band, preferred_range, include_reserved_slots, modulation_format ) rejected_reasons.extend(rejected) if candidate is not None: candidates.append(candidate) reply = { "required_slots": required_slots, "effective_channel_width_ghz": effective_channel_width_ghz, "candidates": candidates, "rejected_reasons": rejected_reasons, "request_summary": _request_summary(request_json, src_endpoint, dst_endpoint), } return reply, 200 if len(candidates) > 0 else 409 def _request_summary(request_json: Dict, src_endpoint, dst_endpoint) -> Dict: return { "src_endpoint_id": src_endpoint, "dst_endpoint_id": dst_endpoint, "src_endpoint_uuid": _endpoint_uuid(src_endpoint), "dst_endpoint_uuid": _endpoint_uuid(dst_endpoint), "capacity_gbps": request_json.get("capacity_gbps", request_json.get("bitrate_gbps")), "modulation_format": request_json.get("modulation_format"), "explicit_channel_width_ghz": request_json.get( "explicit_channel_width_ghz", request_json.get("channel_width_ghz") ), "requested_modulation_format": request_json.get("modulation_format"), "preferred_band": request_json.get("preferred_band"), "preferred_n_start": request_json.get("preferred_n_start"), "preferred_n_end": request_json.get("preferred_n_end"), }
src/opticalcontroller/service/__init__.py 0 → 100644 +13 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.