Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
DeviceModel.py 2.41 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import operator
    from typing import Dict
    from sqlalchemy import Column, Float, String, Enum
    from sqlalchemy.dialects.postgresql import UUID, ARRAY
    from sqlalchemy.orm import relationship
    from ._Base import _Base
    from .enums.DeviceDriver import ORM_DeviceDriverEnum
    from .enums.DeviceOperationalStatus import ORM_DeviceOperationalStatusEnum
    
    class DeviceModel(_Base):
        __tablename__ = 'device'
        device_uuid = Column(UUID(as_uuid=False), primary_key=True)
        device_name = Column(String, nullable=False)
        device_type = Column(String, nullable=False)
        device_operational_status = Column(Enum(ORM_DeviceOperationalStatusEnum))
        device_drivers = Column(ARRAY(Enum(ORM_DeviceDriverEnum), dimensions=1))
        created_at = Column(Float)
    
        topology_devices = relationship('TopologyDeviceModel', back_populates='device')
        config_rules     = relationship('ConfigRuleModel', passive_deletes=True, back_populates='device', lazy='joined')
        endpoints        = relationship('EndPointModel', passive_deletes=True, back_populates='device', lazy='joined')
    
        def dump_id(self) -> Dict:
            return {'device_uuid': {'uuid': self.device_uuid}}
    
        def dump(self) -> Dict:
            return {
                'device_id'                : self.dump_id(),
                'name'                     : self.device_name,
                'device_type'              : self.device_type,
                'device_operational_status': self.device_operational_status.value,
                'device_drivers'           : [driver.value for driver in self.device_drivers],
                'device_config'            : {'config_rules': [
                    config_rule.dump()
                    for config_rule in sorted(self.config_rules, key=operator.attrgetter('position'))
                ]},
                'device_endpoints'         : [endpoint.dump() for endpoint in self.endpoints],
            }