Skip to content
Snippets Groups Projects
ServiceModel.py 3.35 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import functools, logging, operator
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from enum import Enum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from typing import Dict, List
    from common.orm.fields.EnumeratedField import EnumeratedField
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.fields.ForeignKeyField import ForeignKeyField
    from common.orm.fields.PrimaryKeyField import PrimaryKeyField
    from common.orm.fields.StringField import StringField
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.model.Model import Model
    from common.orm.HighLevel import get_related_objects
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .ConfigModel import ConfigModel
    from .ConstraintModel import ConstraintsModel
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .ContextModel import ContextModel
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .Tools import grpc_to_enum
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    LOGGER = logging.getLogger(__name__)
    
    class ORM_ServiceTypeEnum(Enum):
        UNKNOWN                   = ServiceTypeEnum.SERVICETYPE_UNKNOWN
        L3NM                      = ServiceTypeEnum.SERVICETYPE_L3NM
        L2NM                      = ServiceTypeEnum.SERVICETYPE_L2NM
        TAPI_CONNECTIVITY_SERVICE = ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE
    
    grpc_to_enum__service_type = functools.partial(
        grpc_to_enum, ServiceTypeEnum, ORM_ServiceTypeEnum)
    
    class ORM_ServiceStatusEnum(Enum):
        UNDEFINED       = ServiceStatusEnum.SERVICESTATUS_UNDEFINED
        PLANNED         = ServiceStatusEnum.SERVICESTATUS_PLANNED
        ACTIVE          = ServiceStatusEnum.SERVICESTATUS_ACTIVE
        PENDING_REMOVAL = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
    
    grpc_to_enum__service_status = functools.partial(
        grpc_to_enum, ServiceStatusEnum, ORM_ServiceStatusEnum)
    
    class ServiceModel(Model):
        pk = PrimaryKeyField()
        context_fk = ForeignKeyField(ContextModel)
        service_uuid = StringField(required=True, allow_empty=False)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        service_type = EnumeratedField(ORM_ServiceTypeEnum, required=True)
        service_constraints_fk = ForeignKeyField(ConstraintsModel)
        service_status = EnumeratedField(ORM_ServiceStatusEnum, required=True)
        service_config_fk = ForeignKeyField(ConfigModel)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def dump_id(self) -> Dict:
            context_id = ContextModel(self.database, self.context_fk).dump_id()
            return {
                'context_id': context_id,
                'service_uuid': {'uuid': self.service_uuid},
            }
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dump_endpoint_ids(self) -> List[Dict]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            from .RelationModels import ServiceEndPointModel # pylint: disable=import-outside-toplevel
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            db_endpoints = get_related_objects(self, ServiceEndPointModel, 'endpoint_fk')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return [db_endpoint.dump_id() for db_endpoint in sorted(db_endpoints, key=operator.attrgetter('pk'))]
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def dump_constraints(self) -> List[Dict]:
            return ConstraintsModel(self.database, self.service_constraints_fk).dump()
    
        def dump_config(self) -> Dict:
            return ConfigModel(self.database, self.service_config_fk).dump()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        def dump(   # pylint: disable=arguments-differ
                self, include_endpoint_ids=True, include_constraints=True, include_config_rules=True
            ) -> Dict:
            result = {
                'service_id': self.dump_id(),
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                'service_type': self.service_type.value,
                'service_status': {'service_status': self.service_status.value},
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            }
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            if include_endpoint_ids: result['service_endpoint_ids'] = self.dump_endpoint_ids()
            if include_constraints: result['service_constraints'] = self.dump_constraints()
            if include_config_rules: result.setdefault('service_config', {})['config_rules'] = self.dump_config()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return result