diff --git a/src/context/service/database/models/Slot.py b/src/context/service/database/models/Slot.py index c406cf158b8f889a494483ed941449f2e6c19850..6733642769b5d4c1ed8692fbb86a70095c76c8f5 100644 --- a/src/context/service/database/models/Slot.py +++ b/src/context/service/database/models/Slot.py @@ -12,69 +12,71 @@ # See the License for the specific language governing permissions and # limitations under the License. -from sqlalchemy.types import TypeDecorator, Integer +from sqlalchemy.types import String, TypeDecorator + class SlotType(TypeDecorator): - impl = Integer + impl = String + cache_ok = True + + start_point = 0 + width = 0 + + def _valid_key_range(self): + return range(self.start_point, self.start_point + self.width) + + def _normalize(self, value): + if value is None: + return None + + normalized = {} + for key, slot_value in value.items(): + key_int = int(key) + if key_int not in self._valid_key_range(): + msg = 'Slot key {:d} out of valid range [{:d}, {:d}]' + raise ValueError(msg.format(key_int, self.start_point, self.start_point + self.width - 1)) + + bit_value = int(slot_value) + if bit_value not in {0, 1}: + raise ValueError('Slot value must be 0 or 1, got {:d}'.format(bit_value)) + + normalized[key_int] = bit_value + return normalized def process_bind_param(self, value, dialect): - if value is not None: - bin_num = "0b" - for i,(key,val) in enumerate(value.items()): - bin_num =bin_num + f"{val}" - int_num = int(bin_num,2) - return int_num + normalized = self._normalize(value) + if normalized is None: + return None - def process_result_value(self, value, dialect): - if value is not None: - slot = dict() - bin_num = bin(value) - sliced_num = bin_num[2:] - for i in range(len(sliced_num)): - slot[str(i+1)]=int(sliced_num[i]) - return slot + int_num = 0 + for key in self._valid_key_range(): + bit_value = normalized.get(key, 0) + if bit_value == 1: + int_num |= 1 << (key - self.start_point) -class C_Slot(SlotType): - start_point = 0 + return str(int_num) def process_result_value(self, value, dialect): - if value is not None: - slot = dict() - bin_num = bin(value) - sliced_num = bin_num[2:] - if (len(sliced_num) != 20) : - for i in range(0,20 - len(sliced_num)): - sliced_num = '0' + sliced_num - for i in range(len(sliced_num)): - slot[str(self.start_point+i+1)]=int(sliced_num[i]) + if value is None: + return None + + int_num = int(value) + slot = {} + for key in self._valid_key_range(): + slot[str(key)] = (int_num >> (key - self.start_point)) & 1 return slot -class L_Slot (SlotType): - start_point = 100 - def process_result_value(self, value, dialect): - if value is not None: - slot = dict() - bin_num = bin(value) - sliced_num = bin_num[2:] - if (len(sliced_num) != 20) : - for i in range(0,20 - len(sliced_num)): - sliced_num='0'+sliced_num - for i in range(len(sliced_num)): - slot[str(self.start_point+i+1)]=int(sliced_num[i]) - return slot +class C_Slot(SlotType): + start_point = 1 + width = 320 -class S_Slot (SlotType): - start_point = 500 - def process_result_value(self, value, dialect): - if value is not None: - slot= dict() - bin_num = bin(value) - sliced_num=bin_num[2:] - if (len(sliced_num) != 20) : - for i in range(0,20 - len(sliced_num)): - sliced_num='0'+sliced_num - for i in range(len(sliced_num)): - slot[str(self.start_point+i+1)]=int(sliced_num[i]) - return slot +class L_Slot(SlotType): + start_point = 101 + width = 550 + + +class S_Slot(SlotType): + start_point = 501 + width = 720 diff --git a/src/context/tests/test_optical_link_slots.py b/src/context/tests/test_optical_link_slots.py new file mode 100644 index 0000000000000000000000000000000000000000..17f92c2bb8c18c3c26fb3194abdd1cb1219b9744 --- /dev/null +++ b/src/context/tests/test_optical_link_slots.py @@ -0,0 +1,108 @@ +# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest +import sqlalchemy +from sqlalchemy import Column, Integer +from sqlalchemy.orm import Session, declarative_base + +from context.service.database.models.Slot import C_Slot, L_Slot, S_Slot + + +Base = declarative_base() + + +class SlotSmokeModel(Base): + __tablename__ = 'slot_smoke' + + id = Column(Integer, primary_key=True) + c_slots = Column(C_Slot, nullable=True) + l_slots = Column(L_Slot, nullable=True) + s_slots = Column(S_Slot, nullable=True) + + +def build_expected_slot_map(start_slot: int, width: int, active_slots): + active_slots = set(active_slots) + return {str(slot): (1 if slot in active_slots else 0) for slot in range(start_slot, start_slot + width)} + + +def build_sparse_slot_input(active_slots): + return {str(slot): 1 for slot in reversed(active_slots)} + + +@pytest.mark.parametrize( + 'slot_type,width,active_slots', + [ + (C_Slot(), 320, [1, 18, 320]), + (L_Slot(), 550, [101, 202, 650]), + (S_Slot(), 720, [501, 706, 1220]), + ], +) +def test_slot_type_roundtrip_preserves_positions(slot_type, width, active_slots) -> None: + sparse_input = build_sparse_slot_input(active_slots) + encoded = slot_type.process_bind_param(sparse_input, dialect=None) + decoded = slot_type.process_result_value(encoded, dialect=None) + + assert encoded is not None + assert decoded == build_expected_slot_map(slot_type.start_point, width, active_slots) + + +@pytest.mark.parametrize( + 'slot_type,invalid_key', + [ + (C_Slot(), 321), + (L_Slot(), 651), + (S_Slot(), 1221), + ], +) +def test_slot_type_rejects_out_of_range_keys(slot_type, invalid_key: int) -> None: + with pytest.raises(ValueError): + slot_type.process_bind_param({str(invalid_key): 1}, dialect=None) + + +def _run_slot_smoke_test(engine: sqlalchemy.engine.Engine) -> None: + Base.metadata.create_all(engine) + try: + c_slots = build_sparse_slot_input([1, 11, 320]) + l_slots = build_sparse_slot_input([101, 113, 650]) + s_slots = build_sparse_slot_input([501, 515, 1220]) + + with Session(engine) as session: + session.add(SlotSmokeModel(id=1, c_slots=c_slots, l_slots=l_slots, s_slots=s_slots)) + session.commit() + + with Session(engine) as session: + stored = session.query(SlotSmokeModel).filter_by(id=1).one() + assert stored.c_slots == build_expected_slot_map(1, 320, [1, 11, 320]) + assert stored.l_slots == build_expected_slot_map(101, 550, [101, 113, 650]) + assert stored.s_slots == build_expected_slot_map(501, 720, [501, 515, 1220]) + finally: + Base.metadata.drop_all(engine) + + +def test_slot_smoke_sqlite() -> None: + engine = sqlalchemy.create_engine('sqlite:///:memory:', future=True) + _run_slot_smoke_test(engine) + + +def test_slot_smoke_cockroachdb() -> None: + crdb_uri = os.environ.get('CRDB_URI') + if crdb_uri is None: + pytest.skip('CRDB_URI is not set') + engine = sqlalchemy.create_engine( + crdb_uri, connect_args={'application_name': 'tfs-slot-smoketest'}, future=True + ) + _run_slot_smoke_test(engine)