Commit 27216e0b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/391-cttc-fix-optical-slot-codec-in-context-db' into 'develop'

Resolve "(CTTC) Fix optical slot codec in Context DB"

See merge request !453
parents f962b570 21ecc7b7
Loading
Loading
Loading
Loading
+55 −53
Original line number Diff line number Diff line
@@ -12,69 +12,71 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy.types import  TypeDecorator, Integer
from sqlalchemy.types import String, TypeDecorator


class SlotType(TypeDecorator):
    impl = Integer
    impl = String
    cache_ok = True

    start_point = 0
    width = 0

    def _valid_key_range(self):
        return range(self.start_point, self.start_point + self.width)

    def _normalize(self, value):
        if value is None:
            return None

        normalized = {}
        for key, slot_value in value.items():
            key_int = int(key)
            if key_int not in self._valid_key_range():
                msg = 'Slot key {:d} out of valid range [{:d}, {:d}]'
                raise ValueError(msg.format(key_int, self.start_point, self.start_point + self.width - 1))

            bit_value = int(slot_value)
            if bit_value not in {0, 1}:
                raise ValueError('Slot value must be 0 or 1, got {:d}'.format(bit_value))

            normalized[key_int] = bit_value
        return normalized

    def process_bind_param(self, value, dialect):
        if value is not None:
            bin_num = "0b"
            for i,(key,val) in enumerate(value.items()):
                bin_num =bin_num + f"{val}"
            int_num = int(bin_num,2)  
        return int_num
        normalized = self._normalize(value)
        if normalized is None:
            return None

        int_num = 0
        for key in self._valid_key_range():
            bit_value = normalized.get(key, 0)
            if bit_value == 1:
                int_num |= 1 << (key - self.start_point)

        return str(int_num)

    def process_result_value(self, value, dialect):
        if value is not None:
            slot = dict()
            bin_num = bin(value)
            sliced_num = bin_num[2:]
            for i in range(len(sliced_num)):
                slot[str(i+1)]=int(sliced_num[i])
        if value is None:
            return None

        int_num = int(value)
        slot = {}
        for key in self._valid_key_range():
            slot[str(key)] = (int_num >> (key - self.start_point)) & 1
        return slot


class C_Slot(SlotType):
    start_point = 0
    start_point = 1
    width = 320

    def process_result_value(self, value, dialect):
        if value is not None:
            slot = dict()
            bin_num = bin(value)
            sliced_num = bin_num[2:]
            if (len(sliced_num) != 20) :
                for i in range(0,20 - len(sliced_num)):
                    sliced_num = '0' + sliced_num
            for i in range(len(sliced_num)):
                slot[str(self.start_point+i+1)]=int(sliced_num[i])
        return slot

class L_Slot(SlotType):
    start_point = 100
    start_point = 101
    width = 550

    def process_result_value(self, value, dialect):
        if value is not None:
            slot = dict()
            bin_num = bin(value)
            sliced_num = bin_num[2:]
            if (len(sliced_num) != 20) :
                for i in range(0,20 - len(sliced_num)):
                    sliced_num='0'+sliced_num
            for i in range(len(sliced_num)):
                slot[str(self.start_point+i+1)]=int(sliced_num[i])
        return slot

class S_Slot(SlotType):
    start_point = 500

    def process_result_value(self, value, dialect):
        if value is not None:
            slot= dict()
            bin_num = bin(value)
            sliced_num=bin_num[2:]
            if (len(sliced_num) != 20) :
                for i in range(0,20 - len(sliced_num)):
                    sliced_num='0'+sliced_num
            for i in range(len(sliced_num)):
                slot[str(self.start_point+i+1)]=int(sliced_num[i])
        return slot
    start_point = 501
    width = 720
+108 −0
Original line number Diff line number Diff line
# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest
import sqlalchemy
from sqlalchemy import Column, Integer
from sqlalchemy.orm import Session, declarative_base

from context.service.database.models.Slot import C_Slot, L_Slot, S_Slot


Base = declarative_base()


class SlotSmokeModel(Base):
    __tablename__ = 'slot_smoke'

    id = Column(Integer, primary_key=True)
    c_slots = Column(C_Slot, nullable=True)
    l_slots = Column(L_Slot, nullable=True)
    s_slots = Column(S_Slot, nullable=True)


def build_expected_slot_map(start_slot: int, width: int, active_slots):
    active_slots = set(active_slots)
    return {str(slot): (1 if slot in active_slots else 0) for slot in range(start_slot, start_slot + width)}


def build_sparse_slot_input(active_slots):
    return {str(slot): 1 for slot in reversed(active_slots)}


@pytest.mark.parametrize(
    'slot_type,width,active_slots',
    [
        (C_Slot(), 320, [1, 18, 320]),
        (L_Slot(), 550, [101, 202, 650]),
        (S_Slot(), 720, [501, 706, 1220]),
    ],
)
def test_slot_type_roundtrip_preserves_positions(slot_type, width, active_slots) -> None:
    sparse_input = build_sparse_slot_input(active_slots)
    encoded = slot_type.process_bind_param(sparse_input, dialect=None)
    decoded = slot_type.process_result_value(encoded, dialect=None)

    assert encoded is not None
    assert decoded == build_expected_slot_map(slot_type.start_point, width, active_slots)


@pytest.mark.parametrize(
    'slot_type,invalid_key',
    [
        (C_Slot(), 321),
        (L_Slot(), 651),
        (S_Slot(), 1221),
    ],
)
def test_slot_type_rejects_out_of_range_keys(slot_type, invalid_key: int) -> None:
    with pytest.raises(ValueError):
        slot_type.process_bind_param({str(invalid_key): 1}, dialect=None)


def _run_slot_smoke_test(engine: sqlalchemy.engine.Engine) -> None:
    Base.metadata.create_all(engine)
    try:
        c_slots = build_sparse_slot_input([1, 11, 320])
        l_slots = build_sparse_slot_input([101, 113, 650])
        s_slots = build_sparse_slot_input([501, 515, 1220])

        with Session(engine) as session:
            session.add(SlotSmokeModel(id=1, c_slots=c_slots, l_slots=l_slots, s_slots=s_slots))
            session.commit()

        with Session(engine) as session:
            stored = session.query(SlotSmokeModel).filter_by(id=1).one()
            assert stored.c_slots == build_expected_slot_map(1, 320, [1, 11, 320])
            assert stored.l_slots == build_expected_slot_map(101, 550, [101, 113, 650])
            assert stored.s_slots == build_expected_slot_map(501, 720, [501, 515, 1220])
    finally:
        Base.metadata.drop_all(engine)


def test_slot_smoke_sqlite() -> None:
    engine = sqlalchemy.create_engine('sqlite:///:memory:', future=True)
    _run_slot_smoke_test(engine)


def test_slot_smoke_cockroachdb() -> None:
    crdb_uri = os.environ.get('CRDB_URI')
    if crdb_uri is None:
        pytest.skip('CRDB_URI is not set')
    engine = sqlalchemy.create_engine(
        crdb_uri, connect_args={'application_name': 'tfs-slot-smoketest'}, future=True
    )
    _run_slot_smoke_test(engine)