Loading proto/context.proto +40 −0 Original line number Diff line number Diff line Loading @@ -99,6 +99,11 @@ service ContextService { rpc GetOpticalBand (Empty ) returns (OpticalBandList) {} rpc SelectOpticalBand (OpticalBandId ) returns (OpticalBand) {} rpc SetOpticalSpectrumReservation (OpticalSpectrumReservation ) returns (OpticalSpectrumReservationId ) {} rpc GetOpticalSpectrumReservation (OpticalSpectrumReservationId) returns (OpticalSpectrumReservation ) {} rpc ListOpticalSpectrumReservations (ContextId ) returns (OpticalSpectrumReservationList) {} rpc ConsumeOpticalSpectrumReservation (OpticalSpectrumReservation ) returns (OpticalSpectrumReservationId ) {} rpc ReleaseOpticalSpectrumReservation (OpticalSpectrumReservationId) returns (Empty ) {} rpc DeleteServiceConfigRule(ServiceConfigRule) returns (Empty ) {} } Loading Loading @@ -813,6 +818,41 @@ message OpticalBandList { } enum OpticalSpectrumReservationStatusEnum { OPTICALSPECTRUMRESERVATIONSTATUS_UNDEFINED = 0; OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED = 1; OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED = 2; OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED = 3; OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED = 4; } message OpticalSpectrumReservationId { ContextId context_id = 1; Uuid reservation_uuid = 2; } message OpticalSpectrumReservation { OpticalSpectrumReservationId reservation_id = 1; TopologyId topology_id = 2; repeated LinkId optical_link_ids = 3; string band = 4; uint32 n_start = 5; uint32 n_end = 6; uint32 required_slots = 7; ServiceId service_id = 8; ConnectionId connection_id = 9; string owner_id = 10; string correlation_id = 11; OpticalSpectrumReservationStatusEnum status = 12; Timestamp created_at = 13; Timestamp updated_at = 14; Timestamp expires_at = 15; } message OpticalSpectrumReservationList { repeated OpticalSpectrumReservation reservations = 1; } ////////////////// Config Rule Delete //////////// Loading src/context/client/ContextClient.py +37 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,8 @@ from common.proto.context_pb2 import ( Service, ServiceConfigRule, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, OpticalBand, OpticalBandId, OpticalBandList OpticalBand, OpticalBandId, OpticalBandList, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList ) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub Loading Loading @@ -514,6 +515,41 @@ class ContextClient: LOGGER.debug('SetOpticalBand result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetOpticalSpectrumReservation(self, request : OpticalSpectrumReservation) -> OpticalSpectrumReservationId: LOGGER.debug('SetOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetOpticalSpectrumReservation(request) LOGGER.debug('SetOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetOpticalSpectrumReservation(self, request : OpticalSpectrumReservationId) -> OpticalSpectrumReservation: LOGGER.debug('GetOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetOpticalSpectrumReservation(request) LOGGER.debug('GetOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListOpticalSpectrumReservations(self, request : ContextId) -> OpticalSpectrumReservationList: LOGGER.debug('ListOpticalSpectrumReservations request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListOpticalSpectrumReservations(request) LOGGER.debug('ListOpticalSpectrumReservations result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ConsumeOpticalSpectrumReservation(self, request : OpticalSpectrumReservation) -> OpticalSpectrumReservationId: LOGGER.debug('ConsumeOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ConsumeOpticalSpectrumReservation(request) LOGGER.debug('ConsumeOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ReleaseOpticalSpectrumReservation(self, request : OpticalSpectrumReservationId) -> Empty: LOGGER.debug('ReleaseOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ReleaseOpticalSpectrumReservation(request) LOGGER.debug('ReleaseOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response #--------------------------- Optical Link ------------------------ def GetOpticalLinkList(self, request: Empty) -> OpticalLinkList: LOGGER.debug('ListOpticalLinks request: {:s}'.format(grpc_message_to_json_string(request))) Loading src/context/service/ContextServiceServicerImpl.py +37 −1 Original line number Diff line number Diff line Loading @@ -25,7 +25,8 @@ from common.proto.context_pb2 import ( Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, OpticalConfigList, OpticalConfigId, OpticalConfig, OpticalLink, OpticalLinkList, OpticalBand, OpticalBandId, OpticalBandList OpticalBand, OpticalBandId, OpticalBandList, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList ) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer Loading Loading @@ -72,6 +73,11 @@ from .database.OpticalConfig import ( from .database.OpticalLink import ( optical_link_delete, optical_link_get, optical_link_list_objs, optical_link_set ) from .database.OpticalSpectrumReservation import ( optical_spectrum_reservation_consume, optical_spectrum_reservation_get, optical_spectrum_reservation_list, optical_spectrum_reservation_release, optical_spectrum_reservation_set ) LOGGER = logging.getLogger(__name__) Loading Loading @@ -381,6 +387,36 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer result = set_optical_band(self.db_engine, request) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetOpticalSpectrumReservation( self, request : OpticalSpectrumReservation, context : grpc.ServicerContext ) -> OpticalSpectrumReservationId: return optical_spectrum_reservation_set(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetOpticalSpectrumReservation( self, request : OpticalSpectrumReservationId, context : grpc.ServicerContext ) -> OpticalSpectrumReservation: return optical_spectrum_reservation_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListOpticalSpectrumReservations( self, request : ContextId, context : grpc.ServicerContext ) -> OpticalSpectrumReservationList: return optical_spectrum_reservation_list(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ConsumeOpticalSpectrumReservation( self, request : OpticalSpectrumReservation, context : grpc.ServicerContext ) -> OpticalSpectrumReservationId: return optical_spectrum_reservation_consume(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ReleaseOpticalSpectrumReservation( self, request : OpticalSpectrumReservationId, context : grpc.ServicerContext ) -> Empty: return optical_spectrum_reservation_release(self.db_engine, request) #--------------------- Experimental Optical Link ------------------- Loading src/context/service/database/OpticalSpectrumReservation.py 0 → 100644 +294 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime, json, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Set from common.method_wrappers.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException from common.proto.context_pb2 import ( ContextId, Empty, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList, OpticalSpectrumReservationStatusEnum ) from context.service.database.models.OpticalSpectrumReservationModel import OpticalSpectrumReservationModel from .uuids._Builder import get_uuid_from_string, get_uuid_random from .uuids.Context import context_get_uuid from .uuids.Link import link_get_uuid from .uuids.Service import service_get_uuid from .uuids.Topology import topology_get_uuid LOGGER = logging.getLogger(__name__) ACTIVE_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, } TERMINAL_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED, } VALID_BANDS = {'c_slots', 'l_slots', 's_slots'} def _now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _timestamp_to_datetime(timestamp) -> Optional[datetime.datetime]: if timestamp.timestamp <= 0: return None return datetime.datetime.fromtimestamp(timestamp.timestamp, tz=datetime.timezone.utc) def _reservation_get_uuid(request : OpticalSpectrumReservationId, allow_random : bool = False) -> str: raw_reservation_uuid = request.reservation_uuid.uuid if len(raw_reservation_uuid) > 0: return get_uuid_from_string(raw_reservation_uuid) if allow_random: return get_uuid_random() raise InvalidArgumentException('reservation_id.reservation_uuid.uuid', raw_reservation_uuid) def _normalize_band(band : str) -> str: normalized_band = str(band).strip().lower().replace('-', '_') if normalized_band in {'c', 'c_band', 'c_slots'}: return 'c_slots' if normalized_band in {'l', 'l_band', 'l_slots'}: return 'l_slots' if normalized_band in {'s', 's_band', 's_slots'}: return 's_slots' raise InvalidArgumentException('band', str(band)) def _normalize_link_uuids(request : OpticalSpectrumReservation) -> List[str]: link_uuids = [ link_get_uuid(link_id, allow_random=False) for link_id in request.optical_link_ids ] if len(link_uuids) == 0: raise InvalidArgumentException('optical_link_ids', '[]') return sorted(set(link_uuids)) def _normalize_status(status : int) -> int: if status == OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_UNDEFINED: return OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED if status in { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED, }: return status raise InvalidArgumentException('status', str(status)) def _service_uuid(request : OpticalSpectrumReservation) -> Optional[str]: if not request.HasField('service_id'): return None if len(request.service_id.service_uuid.uuid) == 0: return None _, service_uuid = service_get_uuid(request.service_id, allow_random=False) return service_uuid def _connection_uuid(request : OpticalSpectrumReservation) -> Optional[str]: if not request.HasField('connection_id'): return None raw_connection_uuid = request.connection_id.connection_uuid.uuid if len(raw_connection_uuid) == 0: return None return get_uuid_from_string(raw_connection_uuid) def _ranges_overlap(a_start : int, a_end : int, b_start : int, b_end : int) -> bool: return a_start <= b_end and b_start <= a_end def _is_expired(obj : OpticalSpectrumReservationModel, now : datetime.datetime) -> bool: return obj.expires_at is not None and obj.expires_at <= now def _conflicts( obj : OpticalSpectrumReservationModel, reservation_uuid : str, context_uuid : str, topology_uuid : str, link_uuids : Set[str], band : str, n_start : int, n_end : int, now : datetime.datetime ) -> bool: if obj.reservation_uuid == reservation_uuid: return False if obj.context_uuid != context_uuid or obj.topology_uuid != topology_uuid: return False if obj.band != band: return False if obj.status not in ACTIVE_STATUSES or _is_expired(obj, now): return False if len(link_uuids.intersection(obj.get_optical_link_uuids())) == 0: return False return _ranges_overlap(n_start, n_end, obj.n_start, obj.n_end) def optical_spectrum_reservation_set( db_engine : Engine, request : OpticalSpectrumReservation ) -> OpticalSpectrumReservationId: context_uuid = context_get_uuid(request.reservation_id.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request.reservation_id, allow_random=True) topology_context_uuid, topology_uuid = topology_get_uuid(request.topology_id, allow_random=False, allow_default=True) if topology_context_uuid != context_uuid: raise InvalidArgumentException('topology_id.context_id', topology_context_uuid) band = _normalize_band(request.band) n_start = int(request.n_start) n_end = int(request.n_end) if n_end < n_start: raise InvalidArgumentException('n_end', str(n_end), extra_details='n_end must be >= n_start') required_slots = int(request.required_slots) if required_slots < 0: raise InvalidArgumentException('required_slots', str(required_slots)) link_uuids = _normalize_link_uuids(request) now = _now() expires_at = _timestamp_to_datetime(request.expires_at) status = _normalize_status(request.status) data = { 'reservation_uuid': reservation_uuid, 'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'optical_link_uuids': json.dumps(link_uuids), 'band': band, 'n_start': n_start, 'n_end': n_end, 'required_slots': required_slots, 'service_uuid': _service_uuid(request), 'connection_uuid': _connection_uuid(request), 'owner_id': request.owner_id or None, 'correlation_id': request.correlation_id or None, 'status': status, 'created_at': now, 'updated_at': now, 'expires_at': expires_at, } def callback(session : Session) -> Dict: active_objects : List[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel).all() for obj in active_objects: if _conflicts(obj, reservation_uuid, context_uuid, topology_uuid, set(link_uuids), band, n_start, n_end, now): raise AlreadyExistsException( 'OpticalSpectrumReservation', obj.reservation_uuid, extra_details='overlapping spectrum reservation' ) stmt = insert(OpticalSpectrumReservationModel).values([data]) stmt = stmt.on_conflict_do_update( index_elements=[OpticalSpectrumReservationModel.reservation_uuid], set_=dict( context_uuid=stmt.excluded.context_uuid, topology_uuid=stmt.excluded.topology_uuid, optical_link_uuids=stmt.excluded.optical_link_uuids, band=stmt.excluded.band, n_start=stmt.excluded.n_start, n_end=stmt.excluded.n_end, required_slots=stmt.excluded.required_slots, service_uuid=stmt.excluded.service_uuid, connection_uuid=stmt.excluded.connection_uuid, owner_id=stmt.excluded.owner_id, correlation_id=stmt.excluded.correlation_id, status=stmt.excluded.status, updated_at=stmt.excluded.updated_at, expires_at=stmt.excluded.expires_at, ) ) session.execute(stmt) return {'context_id': {'context_uuid': {'uuid': context_uuid}}, 'reservation_uuid': {'uuid': reservation_uuid}} reservation_id = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationId(**reservation_id) def optical_spectrum_reservation_get( db_engine : Engine, request : OpticalSpectrumReservationId ) -> OpticalSpectrumReservation: context_uuid = context_get_uuid(request.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request) def callback(session : Session) -> Optional[Dict]: obj : Optional[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid, reservation_uuid=reservation_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) if obj is None: raise NotFoundException('OpticalSpectrumReservation', reservation_uuid) return OpticalSpectrumReservation(**obj) def optical_spectrum_reservation_list( db_engine : Engine, request : ContextId ) -> OpticalSpectrumReservationList: context_uuid = context_get_uuid(request, allow_random=False, allow_default=True) def callback(session : Session) -> List[Dict]: obj_list : List[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid).all() return [obj.dump() for obj in obj_list] reservations = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationList(reservations=reservations) def _set_status( db_engine : Engine, request : OpticalSpectrumReservationId, status : int, service_uuid : Optional[str] = None, connection_uuid : Optional[str] = None ) -> OpticalSpectrumReservationId: context_uuid = context_get_uuid(request.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request) now = _now() def callback(session : Session) -> Dict: obj : Optional[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid, reservation_uuid=reservation_uuid).one_or_none() if obj is None: raise NotFoundException('OpticalSpectrumReservation', reservation_uuid) obj.status = status obj.updated_at = now if service_uuid is not None: obj.service_uuid = service_uuid if connection_uuid is not None: obj.connection_uuid = connection_uuid return obj.dump_id() reservation_id = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationId(**reservation_id) def optical_spectrum_reservation_consume( db_engine : Engine, request : OpticalSpectrumReservation ) -> OpticalSpectrumReservationId: return _set_status( db_engine, request.reservation_id, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, service_uuid=_service_uuid(request), connection_uuid=_connection_uuid(request) ) def optical_spectrum_reservation_release( db_engine : Engine, request : OpticalSpectrumReservationId ) -> Empty: _set_status( db_engine, request, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED ) return Empty() src/context/service/database/models/OpticalSpectrumReservationModel.py 0 → 100644 +93 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from typing import Dict, List, Optional from sqlalchemy import CheckConstraint, Column, DateTime, ForeignKey, Integer, String from sqlalchemy.dialects.postgresql import UUID from ._Base import _Base class OpticalSpectrumReservationModel(_Base): __tablename__ = 'optical_spectrum_reservation' reservation_uuid = Column(UUID(as_uuid=False), primary_key=True) context_uuid = Column(ForeignKey('context.context_uuid', ondelete='CASCADE'), nullable=False, index=True) topology_uuid = Column(UUID(as_uuid=False), nullable=False, index=True) optical_link_uuids = Column(String, nullable=False) band = Column(String, nullable=False) n_start = Column(Integer, nullable=False) n_end = Column(Integer, nullable=False) required_slots = Column(Integer, nullable=False) service_uuid = Column(UUID(as_uuid=False), nullable=True, index=True) connection_uuid = Column(UUID(as_uuid=False), nullable=True, index=True) owner_id = Column(String, nullable=True) correlation_id = Column(String, nullable=True) status = Column(Integer, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) expires_at = Column(DateTime, nullable=True) __table_args__ = ( CheckConstraint(n_start >= 0, name='check_n_start_value'), CheckConstraint(n_end >= n_start, name='check_n_end_value'), CheckConstraint(required_slots >= 0, name='check_required_slots_value'), ) def get_optical_link_uuids(self) -> List[str]: return json.loads(self.optical_link_uuids) def dump_id(self) -> Dict: return { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'reservation_uuid': {'uuid': self.reservation_uuid}, } def _dump_optional_timestamp(self, value) -> Optional[Dict]: if value is None: return None return {'timestamp': value.timestamp()} def dump(self) -> Dict: result = { 'reservation_id': self.dump_id(), 'topology_id': { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'topology_uuid': {'uuid': self.topology_uuid}, }, 'optical_link_ids': [ {'link_uuid': {'uuid': optical_link_uuid}} for optical_link_uuid in self.get_optical_link_uuids() ], 'band': self.band, 'n_start': self.n_start, 'n_end': self.n_end, 'required_slots': self.required_slots, 'owner_id': self.owner_id or '', 'correlation_id': self.correlation_id or '', 'status': self.status, 'created_at': {'timestamp': self.created_at.timestamp()}, 'updated_at': {'timestamp': self.updated_at.timestamp()}, } if self.service_uuid is not None: result['service_id'] = { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'service_uuid': {'uuid': self.service_uuid}, } if self.connection_uuid is not None: result['connection_id'] = {'connection_uuid': {'uuid': self.connection_uuid}} expires_at = self._dump_optional_timestamp(self.expires_at) if expires_at is not None: result['expires_at'] = expires_at return result Loading
proto/context.proto +40 −0 Original line number Diff line number Diff line Loading @@ -99,6 +99,11 @@ service ContextService { rpc GetOpticalBand (Empty ) returns (OpticalBandList) {} rpc SelectOpticalBand (OpticalBandId ) returns (OpticalBand) {} rpc SetOpticalSpectrumReservation (OpticalSpectrumReservation ) returns (OpticalSpectrumReservationId ) {} rpc GetOpticalSpectrumReservation (OpticalSpectrumReservationId) returns (OpticalSpectrumReservation ) {} rpc ListOpticalSpectrumReservations (ContextId ) returns (OpticalSpectrumReservationList) {} rpc ConsumeOpticalSpectrumReservation (OpticalSpectrumReservation ) returns (OpticalSpectrumReservationId ) {} rpc ReleaseOpticalSpectrumReservation (OpticalSpectrumReservationId) returns (Empty ) {} rpc DeleteServiceConfigRule(ServiceConfigRule) returns (Empty ) {} } Loading Loading @@ -813,6 +818,41 @@ message OpticalBandList { } enum OpticalSpectrumReservationStatusEnum { OPTICALSPECTRUMRESERVATIONSTATUS_UNDEFINED = 0; OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED = 1; OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED = 2; OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED = 3; OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED = 4; } message OpticalSpectrumReservationId { ContextId context_id = 1; Uuid reservation_uuid = 2; } message OpticalSpectrumReservation { OpticalSpectrumReservationId reservation_id = 1; TopologyId topology_id = 2; repeated LinkId optical_link_ids = 3; string band = 4; uint32 n_start = 5; uint32 n_end = 6; uint32 required_slots = 7; ServiceId service_id = 8; ConnectionId connection_id = 9; string owner_id = 10; string correlation_id = 11; OpticalSpectrumReservationStatusEnum status = 12; Timestamp created_at = 13; Timestamp updated_at = 14; Timestamp expires_at = 15; } message OpticalSpectrumReservationList { repeated OpticalSpectrumReservation reservations = 1; } ////////////////// Config Rule Delete //////////// Loading
src/context/client/ContextClient.py +37 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,8 @@ from common.proto.context_pb2 import ( Service, ServiceConfigRule, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, OpticalBand, OpticalBandId, OpticalBandList OpticalBand, OpticalBandId, OpticalBandList, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList ) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub Loading Loading @@ -514,6 +515,41 @@ class ContextClient: LOGGER.debug('SetOpticalBand result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetOpticalSpectrumReservation(self, request : OpticalSpectrumReservation) -> OpticalSpectrumReservationId: LOGGER.debug('SetOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetOpticalSpectrumReservation(request) LOGGER.debug('SetOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetOpticalSpectrumReservation(self, request : OpticalSpectrumReservationId) -> OpticalSpectrumReservation: LOGGER.debug('GetOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetOpticalSpectrumReservation(request) LOGGER.debug('GetOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListOpticalSpectrumReservations(self, request : ContextId) -> OpticalSpectrumReservationList: LOGGER.debug('ListOpticalSpectrumReservations request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListOpticalSpectrumReservations(request) LOGGER.debug('ListOpticalSpectrumReservations result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ConsumeOpticalSpectrumReservation(self, request : OpticalSpectrumReservation) -> OpticalSpectrumReservationId: LOGGER.debug('ConsumeOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ConsumeOpticalSpectrumReservation(request) LOGGER.debug('ConsumeOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ReleaseOpticalSpectrumReservation(self, request : OpticalSpectrumReservationId) -> Empty: LOGGER.debug('ReleaseOpticalSpectrumReservation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ReleaseOpticalSpectrumReservation(request) LOGGER.debug('ReleaseOpticalSpectrumReservation result: {:s}'.format(grpc_message_to_json_string(response))) return response #--------------------------- Optical Link ------------------------ def GetOpticalLinkList(self, request: Empty) -> OpticalLinkList: LOGGER.debug('ListOpticalLinks request: {:s}'.format(grpc_message_to_json_string(request))) Loading
src/context/service/ContextServiceServicerImpl.py +37 −1 Original line number Diff line number Diff line Loading @@ -25,7 +25,8 @@ from common.proto.context_pb2 import ( Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, OpticalConfigList, OpticalConfigId, OpticalConfig, OpticalLink, OpticalLinkList, OpticalBand, OpticalBandId, OpticalBandList OpticalBand, OpticalBandId, OpticalBandList, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList ) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer Loading Loading @@ -72,6 +73,11 @@ from .database.OpticalConfig import ( from .database.OpticalLink import ( optical_link_delete, optical_link_get, optical_link_list_objs, optical_link_set ) from .database.OpticalSpectrumReservation import ( optical_spectrum_reservation_consume, optical_spectrum_reservation_get, optical_spectrum_reservation_list, optical_spectrum_reservation_release, optical_spectrum_reservation_set ) LOGGER = logging.getLogger(__name__) Loading Loading @@ -381,6 +387,36 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer result = set_optical_band(self.db_engine, request) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetOpticalSpectrumReservation( self, request : OpticalSpectrumReservation, context : grpc.ServicerContext ) -> OpticalSpectrumReservationId: return optical_spectrum_reservation_set(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetOpticalSpectrumReservation( self, request : OpticalSpectrumReservationId, context : grpc.ServicerContext ) -> OpticalSpectrumReservation: return optical_spectrum_reservation_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListOpticalSpectrumReservations( self, request : ContextId, context : grpc.ServicerContext ) -> OpticalSpectrumReservationList: return optical_spectrum_reservation_list(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ConsumeOpticalSpectrumReservation( self, request : OpticalSpectrumReservation, context : grpc.ServicerContext ) -> OpticalSpectrumReservationId: return optical_spectrum_reservation_consume(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ReleaseOpticalSpectrumReservation( self, request : OpticalSpectrumReservationId, context : grpc.ServicerContext ) -> Empty: return optical_spectrum_reservation_release(self.db_engine, request) #--------------------- Experimental Optical Link ------------------- Loading
src/context/service/database/OpticalSpectrumReservation.py 0 → 100644 +294 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime, json, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Set from common.method_wrappers.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException from common.proto.context_pb2 import ( ContextId, Empty, OpticalSpectrumReservation, OpticalSpectrumReservationId, OpticalSpectrumReservationList, OpticalSpectrumReservationStatusEnum ) from context.service.database.models.OpticalSpectrumReservationModel import OpticalSpectrumReservationModel from .uuids._Builder import get_uuid_from_string, get_uuid_random from .uuids.Context import context_get_uuid from .uuids.Link import link_get_uuid from .uuids.Service import service_get_uuid from .uuids.Topology import topology_get_uuid LOGGER = logging.getLogger(__name__) ACTIVE_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, } TERMINAL_STATUSES = { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED, } VALID_BANDS = {'c_slots', 'l_slots', 's_slots'} def _now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _timestamp_to_datetime(timestamp) -> Optional[datetime.datetime]: if timestamp.timestamp <= 0: return None return datetime.datetime.fromtimestamp(timestamp.timestamp, tz=datetime.timezone.utc) def _reservation_get_uuid(request : OpticalSpectrumReservationId, allow_random : bool = False) -> str: raw_reservation_uuid = request.reservation_uuid.uuid if len(raw_reservation_uuid) > 0: return get_uuid_from_string(raw_reservation_uuid) if allow_random: return get_uuid_random() raise InvalidArgumentException('reservation_id.reservation_uuid.uuid', raw_reservation_uuid) def _normalize_band(band : str) -> str: normalized_band = str(band).strip().lower().replace('-', '_') if normalized_band in {'c', 'c_band', 'c_slots'}: return 'c_slots' if normalized_band in {'l', 'l_band', 'l_slots'}: return 'l_slots' if normalized_band in {'s', 's_band', 's_slots'}: return 's_slots' raise InvalidArgumentException('band', str(band)) def _normalize_link_uuids(request : OpticalSpectrumReservation) -> List[str]: link_uuids = [ link_get_uuid(link_id, allow_random=False) for link_id in request.optical_link_ids ] if len(link_uuids) == 0: raise InvalidArgumentException('optical_link_ids', '[]') return sorted(set(link_uuids)) def _normalize_status(status : int) -> int: if status == OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_UNDEFINED: return OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED if status in { OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RESERVED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_EXPIRED, }: return status raise InvalidArgumentException('status', str(status)) def _service_uuid(request : OpticalSpectrumReservation) -> Optional[str]: if not request.HasField('service_id'): return None if len(request.service_id.service_uuid.uuid) == 0: return None _, service_uuid = service_get_uuid(request.service_id, allow_random=False) return service_uuid def _connection_uuid(request : OpticalSpectrumReservation) -> Optional[str]: if not request.HasField('connection_id'): return None raw_connection_uuid = request.connection_id.connection_uuid.uuid if len(raw_connection_uuid) == 0: return None return get_uuid_from_string(raw_connection_uuid) def _ranges_overlap(a_start : int, a_end : int, b_start : int, b_end : int) -> bool: return a_start <= b_end and b_start <= a_end def _is_expired(obj : OpticalSpectrumReservationModel, now : datetime.datetime) -> bool: return obj.expires_at is not None and obj.expires_at <= now def _conflicts( obj : OpticalSpectrumReservationModel, reservation_uuid : str, context_uuid : str, topology_uuid : str, link_uuids : Set[str], band : str, n_start : int, n_end : int, now : datetime.datetime ) -> bool: if obj.reservation_uuid == reservation_uuid: return False if obj.context_uuid != context_uuid or obj.topology_uuid != topology_uuid: return False if obj.band != band: return False if obj.status not in ACTIVE_STATUSES or _is_expired(obj, now): return False if len(link_uuids.intersection(obj.get_optical_link_uuids())) == 0: return False return _ranges_overlap(n_start, n_end, obj.n_start, obj.n_end) def optical_spectrum_reservation_set( db_engine : Engine, request : OpticalSpectrumReservation ) -> OpticalSpectrumReservationId: context_uuid = context_get_uuid(request.reservation_id.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request.reservation_id, allow_random=True) topology_context_uuid, topology_uuid = topology_get_uuid(request.topology_id, allow_random=False, allow_default=True) if topology_context_uuid != context_uuid: raise InvalidArgumentException('topology_id.context_id', topology_context_uuid) band = _normalize_band(request.band) n_start = int(request.n_start) n_end = int(request.n_end) if n_end < n_start: raise InvalidArgumentException('n_end', str(n_end), extra_details='n_end must be >= n_start') required_slots = int(request.required_slots) if required_slots < 0: raise InvalidArgumentException('required_slots', str(required_slots)) link_uuids = _normalize_link_uuids(request) now = _now() expires_at = _timestamp_to_datetime(request.expires_at) status = _normalize_status(request.status) data = { 'reservation_uuid': reservation_uuid, 'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'optical_link_uuids': json.dumps(link_uuids), 'band': band, 'n_start': n_start, 'n_end': n_end, 'required_slots': required_slots, 'service_uuid': _service_uuid(request), 'connection_uuid': _connection_uuid(request), 'owner_id': request.owner_id or None, 'correlation_id': request.correlation_id or None, 'status': status, 'created_at': now, 'updated_at': now, 'expires_at': expires_at, } def callback(session : Session) -> Dict: active_objects : List[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel).all() for obj in active_objects: if _conflicts(obj, reservation_uuid, context_uuid, topology_uuid, set(link_uuids), band, n_start, n_end, now): raise AlreadyExistsException( 'OpticalSpectrumReservation', obj.reservation_uuid, extra_details='overlapping spectrum reservation' ) stmt = insert(OpticalSpectrumReservationModel).values([data]) stmt = stmt.on_conflict_do_update( index_elements=[OpticalSpectrumReservationModel.reservation_uuid], set_=dict( context_uuid=stmt.excluded.context_uuid, topology_uuid=stmt.excluded.topology_uuid, optical_link_uuids=stmt.excluded.optical_link_uuids, band=stmt.excluded.band, n_start=stmt.excluded.n_start, n_end=stmt.excluded.n_end, required_slots=stmt.excluded.required_slots, service_uuid=stmt.excluded.service_uuid, connection_uuid=stmt.excluded.connection_uuid, owner_id=stmt.excluded.owner_id, correlation_id=stmt.excluded.correlation_id, status=stmt.excluded.status, updated_at=stmt.excluded.updated_at, expires_at=stmt.excluded.expires_at, ) ) session.execute(stmt) return {'context_id': {'context_uuid': {'uuid': context_uuid}}, 'reservation_uuid': {'uuid': reservation_uuid}} reservation_id = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationId(**reservation_id) def optical_spectrum_reservation_get( db_engine : Engine, request : OpticalSpectrumReservationId ) -> OpticalSpectrumReservation: context_uuid = context_get_uuid(request.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request) def callback(session : Session) -> Optional[Dict]: obj : Optional[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid, reservation_uuid=reservation_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) if obj is None: raise NotFoundException('OpticalSpectrumReservation', reservation_uuid) return OpticalSpectrumReservation(**obj) def optical_spectrum_reservation_list( db_engine : Engine, request : ContextId ) -> OpticalSpectrumReservationList: context_uuid = context_get_uuid(request, allow_random=False, allow_default=True) def callback(session : Session) -> List[Dict]: obj_list : List[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid).all() return [obj.dump() for obj in obj_list] reservations = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationList(reservations=reservations) def _set_status( db_engine : Engine, request : OpticalSpectrumReservationId, status : int, service_uuid : Optional[str] = None, connection_uuid : Optional[str] = None ) -> OpticalSpectrumReservationId: context_uuid = context_get_uuid(request.context_id, allow_random=False, allow_default=True) reservation_uuid = _reservation_get_uuid(request) now = _now() def callback(session : Session) -> Dict: obj : Optional[OpticalSpectrumReservationModel] = session.query(OpticalSpectrumReservationModel)\ .filter_by(context_uuid=context_uuid, reservation_uuid=reservation_uuid).one_or_none() if obj is None: raise NotFoundException('OpticalSpectrumReservation', reservation_uuid) obj.status = status obj.updated_at = now if service_uuid is not None: obj.service_uuid = service_uuid if connection_uuid is not None: obj.connection_uuid = connection_uuid return obj.dump_id() reservation_id = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalSpectrumReservationId(**reservation_id) def optical_spectrum_reservation_consume( db_engine : Engine, request : OpticalSpectrumReservation ) -> OpticalSpectrumReservationId: return _set_status( db_engine, request.reservation_id, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_CONSUMED, service_uuid=_service_uuid(request), connection_uuid=_connection_uuid(request) ) def optical_spectrum_reservation_release( db_engine : Engine, request : OpticalSpectrumReservationId ) -> Empty: _set_status( db_engine, request, OpticalSpectrumReservationStatusEnum.OPTICALSPECTRUMRESERVATIONSTATUS_RELEASED ) return Empty()
src/context/service/database/models/OpticalSpectrumReservationModel.py 0 → 100644 +93 −0 Original line number Diff line number Diff line # Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from typing import Dict, List, Optional from sqlalchemy import CheckConstraint, Column, DateTime, ForeignKey, Integer, String from sqlalchemy.dialects.postgresql import UUID from ._Base import _Base class OpticalSpectrumReservationModel(_Base): __tablename__ = 'optical_spectrum_reservation' reservation_uuid = Column(UUID(as_uuid=False), primary_key=True) context_uuid = Column(ForeignKey('context.context_uuid', ondelete='CASCADE'), nullable=False, index=True) topology_uuid = Column(UUID(as_uuid=False), nullable=False, index=True) optical_link_uuids = Column(String, nullable=False) band = Column(String, nullable=False) n_start = Column(Integer, nullable=False) n_end = Column(Integer, nullable=False) required_slots = Column(Integer, nullable=False) service_uuid = Column(UUID(as_uuid=False), nullable=True, index=True) connection_uuid = Column(UUID(as_uuid=False), nullable=True, index=True) owner_id = Column(String, nullable=True) correlation_id = Column(String, nullable=True) status = Column(Integer, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) expires_at = Column(DateTime, nullable=True) __table_args__ = ( CheckConstraint(n_start >= 0, name='check_n_start_value'), CheckConstraint(n_end >= n_start, name='check_n_end_value'), CheckConstraint(required_slots >= 0, name='check_required_slots_value'), ) def get_optical_link_uuids(self) -> List[str]: return json.loads(self.optical_link_uuids) def dump_id(self) -> Dict: return { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'reservation_uuid': {'uuid': self.reservation_uuid}, } def _dump_optional_timestamp(self, value) -> Optional[Dict]: if value is None: return None return {'timestamp': value.timestamp()} def dump(self) -> Dict: result = { 'reservation_id': self.dump_id(), 'topology_id': { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'topology_uuid': {'uuid': self.topology_uuid}, }, 'optical_link_ids': [ {'link_uuid': {'uuid': optical_link_uuid}} for optical_link_uuid in self.get_optical_link_uuids() ], 'band': self.band, 'n_start': self.n_start, 'n_end': self.n_end, 'required_slots': self.required_slots, 'owner_id': self.owner_id or '', 'correlation_id': self.correlation_id or '', 'status': self.status, 'created_at': {'timestamp': self.created_at.timestamp()}, 'updated_at': {'timestamp': self.updated_at.timestamp()}, } if self.service_uuid is not None: result['service_id'] = { 'context_id': {'context_uuid': {'uuid': self.context_uuid}}, 'service_uuid': {'uuid': self.service_uuid}, } if self.connection_uuid is not None: result['connection_id'] = {'connection_uuid': {'uuid': self.connection_uuid}} expires_at = self._dump_optional_timestamp(self.expires_at) if expires_at is not None: result['expires_at'] = expires_at return result