Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
ConfigRuleModel.py 3.81 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import enum, json
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from sqlalchemy import CheckConstraint, Column, DateTime, Enum, ForeignKey, Integer, String
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from sqlalchemy.dialects.postgresql import UUID
    from typing import Dict
    from .enums.ConfigAction import ORM_ConfigActionEnum
    from ._Base import _Base
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    # Enum values should match name of field in ConfigRule message
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class ConfigRuleKindEnum(enum.Enum):
        CUSTOM = 'custom'
        ACL    = 'acl'
    
    Leandro Campos's avatar
    Leandro Campos committed
        PON_ACCESS = 'pon_access'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class DeviceConfigRuleModel(_Base):
        __tablename__ = 'device_configrule'
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        configrule_uuid = Column(UUID(as_uuid=False), primary_key=True)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        device_uuid     = Column(ForeignKey('device.device_uuid', ondelete='CASCADE'), nullable=False) #, index=True
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        position        = Column(Integer, nullable=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        kind            = Column(Enum(ConfigRuleKindEnum), nullable=False)
        action          = Column(Enum(ORM_ConfigActionEnum), nullable=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        data            = Column(String, nullable=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        created_at      = Column(DateTime, nullable=False)
        updated_at      = Column(DateTime, nullable=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        __table_args__ = (
            CheckConstraint(position >= 0, name='check_position_value'),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #UniqueConstraint('device_uuid',  'position', name='unique_per_device' ),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        )
    
        def dump(self) -> Dict:
            return {
                'action': self.action.value,
                self.kind.value: json.loads(self.data),
            }
    
    class ServiceConfigRuleModel(_Base):
        __tablename__ = 'service_configrule'
    
        configrule_uuid = Column(UUID(as_uuid=False), primary_key=True)
        service_uuid    = Column(ForeignKey('service.service_uuid', ondelete='CASCADE'), nullable=False) #, index=True
        position        = Column(Integer, nullable=False)
        kind            = Column(Enum(ConfigRuleKindEnum), nullable=False)
        action          = Column(Enum(ORM_ConfigActionEnum), nullable=False)
        data            = Column(String, nullable=False)
        created_at      = Column(DateTime, nullable=False)
        updated_at      = Column(DateTime, nullable=False)
    
        __table_args__ = (
            CheckConstraint(position >= 0, name='check_position_value'),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #UniqueConstraint('service_uuid', 'position', name='unique_per_service'),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        )
    
        def dump(self) -> Dict:
            return {
                'action': self.action.value,
                self.kind.value: json.loads(self.data),
            }
    
    class SliceConfigRuleModel(_Base):
        __tablename__ = 'slice_configrule'
    
        configrule_uuid = Column(UUID(as_uuid=False), primary_key=True)
        slice_uuid      = Column(ForeignKey('slice.slice_uuid', ondelete='CASCADE'), nullable=False) #, index=True
        position        = Column(Integer, nullable=False)
        kind            = Column(Enum(ConfigRuleKindEnum), nullable=False)
        action          = Column(Enum(ORM_ConfigActionEnum), nullable=False)
        data            = Column(String, nullable=False)
        created_at      = Column(DateTime, nullable=False)
        updated_at      = Column(DateTime, nullable=False)
    
        __table_args__ = (
            CheckConstraint(position >= 0, name='check_position_value'),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #UniqueConstraint('slice_uuid',   'position', name='unique_per_slice'  ),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        )
    
        def dump(self) -> Dict:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return {
                'action': self.action.value,
                self.kind.value: json.loads(self.data),
            }