-
Lluis Gifre Renom authored
Common: - added ORM helper function get_related_instances - minor modifications in ORM framework Proto: - field/message/enum renamings in context.proto to normalize notation Context: - Expanded service/database/Models.py into separate models to facilitate coding - Completed implementation of Get/Set/List/Remove methods for Context, Topology, Device, Link, and EndPoint; also completed for relation models between topology and device/link, and ConfigModels - Implemented test units for Get/Set/List/Remove methods for Context, Topology, Device, Link, and EndPoint - Implemented skeleton for Get/Set/List/Remove methods for Service. - Added Tools.py with helper methods to fimplify handling of enums between ORM and gRPC messages.
Lluis Gifre Renom authoredCommon: - added ORM helper function get_related_instances - minor modifications in ORM framework Proto: - field/message/enum renamings in context.proto to normalize notation Context: - Expanded service/database/Models.py into separate models to facilitate coding - Completed implementation of Get/Set/List/Remove methods for Context, Topology, Device, Link, and EndPoint; also completed for relation models between topology and device/link, and ConfigModels - Implemented test units for Get/Set/List/Remove methods for Context, Topology, Device, Link, and EndPoint - Implemented skeleton for Get/Set/List/Remove methods for Service. - Added Tools.py with helper methods to fimplify handling of enums between ORM and gRPC messages.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DeviceModel.py 3.23 KiB
import functools, logging
from enum import Enum
from typing import Dict, List
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from context.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from .ConfigModel import ConfigModel
from .Tools import grpc_to_enum
LOGGER = logging.getLogger(__name__)
class ORM_DeviceDriverEnum(Enum):
UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED
OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG
TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API
P4 = DeviceDriverEnum.DEVICEDRIVER_P4
IETF_NETWORK_TOPOLOGY = DeviceDriverEnum.DEVICEDRIVER_IETF_NETWORK_TOPOLOGY
ONF_TR_352 = DeviceDriverEnum.DEVICEDRIVER_ONF_TR_352
grpc_to_enum__device_driver = functools.partial(
grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
class ORM_DeviceOperationalStatusEnum(Enum):
UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
grpc_to_enum__device_operational_status = functools.partial(
grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum)
class DeviceModel(Model):
pk = PrimaryKeyField()
device_uuid = StringField(required=True, allow_empty=False)
device_type = StringField()
device_config_fk = ForeignKeyField(ConfigModel)
device_operational_status = EnumeratedField(ORM_DeviceOperationalStatusEnum, required=True)
def dump_id(self) -> Dict:
return {'device_uuid': {'uuid': self.device_uuid}}
def dump_config(self) -> Dict:
return ConfigModel(self.database, self.device_config_fk).dump()
def dump_drivers(self) -> List[int]:
db_driver_pks = self.references(DriverModel)
return [DriverModel(self.database, pk).dump() for pk,_ in db_driver_pks]
def dump_endpoints(self) -> List[Dict]:
from .EndPointModel import EndPointModel
db_endpoints_pks = self.references(EndPointModel)
return [EndPointModel(self.database, pk).dump() for pk,_ in db_endpoints_pks]
def dump( # pylint: disable=arguments-differ
self, include_config_rules=True, include_drivers=True, include_endpoints=True
) -> Dict:
result = {
'device_id': self.dump_id(),
'device_type': self.device_type,
'device_operational_status': self.device_operational_status.value,
}
if include_config_rules: result.setdefault('device_config', {})['config_rules'] = self.dump_config()
if include_drivers: result['device_drivers'] = self.dump_drivers()
if include_endpoints: result['device_endpoints'] = self.dump_endpoints()
return result
class DriverModel(Model):
pk = PrimaryKeyField()
device_fk = ForeignKeyField(DeviceModel)
driver = EnumeratedField(ORM_DeviceDriverEnum, required=True)
def dump(self) -> Dict:
return self.driver.value