Skip to content
Snippets Groups Projects
ServiceModel.py 3.96 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import functools, logging, operator
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from enum import Enum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List
from common.orm.fields.EnumeratedField import EnumeratedField
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.model.Model import Model
from common.orm.HighLevel import get_related_objects
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ConfigModel import ConfigModel
from .ConstraintModel import ConstraintsModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ContextModel import ContextModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Tools import grpc_to_enum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

class ORM_ServiceTypeEnum(Enum):
    UNKNOWN                   = ServiceTypeEnum.SERVICETYPE_UNKNOWN
    L3NM                      = ServiceTypeEnum.SERVICETYPE_L3NM
    L2NM                      = ServiceTypeEnum.SERVICETYPE_L2NM
    TAPI_CONNECTIVITY_SERVICE = ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE

grpc_to_enum__service_type = functools.partial(
    grpc_to_enum, ServiceTypeEnum, ORM_ServiceTypeEnum)

class ORM_ServiceStatusEnum(Enum):
    UNDEFINED       = ServiceStatusEnum.SERVICESTATUS_UNDEFINED
    PLANNED         = ServiceStatusEnum.SERVICESTATUS_PLANNED
    ACTIVE          = ServiceStatusEnum.SERVICESTATUS_ACTIVE
    PENDING_REMOVAL = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL

grpc_to_enum__service_status = functools.partial(
    grpc_to_enum, ServiceStatusEnum, ORM_ServiceStatusEnum)

class ServiceModel(Model):
    pk = PrimaryKeyField()
    context_fk = ForeignKeyField(ContextModel)
    service_uuid = StringField(required=True, allow_empty=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    service_type = EnumeratedField(ORM_ServiceTypeEnum, required=True)
    service_constraints_fk = ForeignKeyField(ConstraintsModel)
    service_status = EnumeratedField(ORM_ServiceStatusEnum, required=True)
    service_config_fk = ForeignKeyField(ConfigModel)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def dump_id(self) -> Dict:
        context_id = ContextModel(self.database, self.context_fk).dump_id()
        return {
            'context_id': context_id,
            'service_uuid': {'uuid': self.service_uuid},
        }

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dump_endpoint_ids(self) -> List[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        from .RelationModels import ServiceEndPointModel # pylint: disable=import-outside-toplevel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_endpoints = get_related_objects(self, ServiceEndPointModel, 'endpoint_fk')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return [db_endpoint.dump_id() for db_endpoint in sorted(db_endpoints, key=operator.attrgetter('pk'))]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def dump_constraints(self) -> List[Dict]:
        return ConstraintsModel(self.database, self.service_constraints_fk).dump()

    def dump_config(self) -> Dict:
        return ConfigModel(self.database, self.service_config_fk).dump()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def dump(   # pylint: disable=arguments-differ
            self, include_endpoint_ids=True, include_constraints=True, include_config_rules=True
        ) -> Dict:
        result = {
            'service_id': self.dump_id(),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'service_type': self.service_type.value,
            'service_status': {'service_status': self.service_status.value},
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        }
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if include_endpoint_ids: result['service_endpoint_ids'] = self.dump_endpoint_ids()
        if include_constraints: result['service_constraints'] = self.dump_constraints()
        if include_config_rules: result.setdefault('service_config', {})['config_rules'] = self.dump_config()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return result