Skip to content
ConnectionModel.py 8.09 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, operator
from typing import Dict, List, Optional, Set, Tuple, Union
from common.orm.Database import Database
from common.orm.backend.Tools import key_to_str
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.model.Model import Model
from common.orm.HighLevel import get_object, get_or_create_object, get_related_objects, update_or_create_object
from common.proto.context_pb2 import EndPointId
from .EndPointModel import EndPointModel
from .ServiceModel import ServiceModel

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def remove_dict_key(dictionary : Dict, key : str):
    dictionary.pop(key, None)
    return dictionary
Carlos Manso's avatar
Carlos Manso committed

from sqlalchemy import Column, Enum, ForeignKey, Integer, CheckConstraint
from typing import Dict, List
from common.orm.HighLevel import get_related_objects
from common.proto.context_pb2 import ServiceStatusEnum, ServiceTypeEnum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ConfigRuleModel import ConfigModel
Carlos Manso's avatar
Carlos Manso committed
from .ConstraintModel import ConstraintsModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .models.ContextModel import ContextModel
Carlos Manso's avatar
Carlos Manso committed
from .Tools import grpc_to_enum
from sqlalchemy.dialects.postgresql import UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.models._Base import Base
Carlos Manso's avatar
Carlos Manso committed
import enum
LOGGER = logging.getLogger(__name__)

LOGGER = logging.getLogger(__name__)

class PathModel(Model): # pylint: disable=abstract-method
Carlos Manso's avatar
Carlos Manso committed
    path_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def delete(self) -> None:
        for db_path_hop_pk,_ in self.references(PathHopModel):
            PathHopModel(self.database, db_path_hop_pk).delete()
        super().delete()

    def dump(self) -> List[Dict]:
        db_path_hop_pks = self.references(PathHopModel)
        path_hops = [PathHopModel(self.database, pk).dump(include_position=True) for pk,_ in db_path_hop_pks]
        path_hops = sorted(path_hops, key=operator.itemgetter('position'))
        return [remove_dict_key(path_hop, 'position') for path_hop in path_hops]

class PathHopModel(Model): # pylint: disable=abstract-method
Carlos Manso's avatar
Carlos Manso committed
    path_hop_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
    path_uuid = Column(UUID(as_uuid=False), ForeignKey("Path.path_uuid"))
    position = Column(Integer, CheckConstraint('position >= 0'), nullable=False)
    endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"))

    def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
        db_endpoint : EndPointModel = EndPointModel(self.database, self.endpoint_fk)
        result = db_endpoint.dump_id()
        if include_position: result['position'] = self.position
        return result

class ConnectionModel(Model):
    pk = PrimaryKeyField()
Carlos Manso's avatar
Carlos Manso committed
    # connection_uuid = StringField(required=True, allow_empty=False)
    connection_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
    # service_fk = ForeignKeyField(ServiceModel, required=False)
    service_uuid = Column(UUID(as_uuid=False), ForeignKey("Service.service_uuid"))
    path_fk = ForeignKeyField(PathModel, required=True)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def delete(self) -> None:
        # pylint: disable=import-outside-toplevel
        from .RelationModels import ConnectionSubServiceModel

        # Do not remove sub-services automatically. They are supported by real services, so Service component should
        # deal with the correct removal workflow to deconfigure the devices.
        for db_connection_sub_service_pk,_ in self.references(ConnectionSubServiceModel):
            ConnectionSubServiceModel(self.database, db_connection_sub_service_pk).delete()

        super().delete()
        PathModel(self.database, self.path_fk).delete()

    def dump_id(self) -> Dict:
        return {
            'connection_uuid': {'uuid': self.connection_uuid},
        }

    def dump_path_hops_endpoint_ids(self) -> List[Dict]:
        return PathModel(self.database, self.path_fk).dump()

    def dump_sub_service_ids(self) -> List[Dict]:
        from .RelationModels import ConnectionSubServiceModel # pylint: disable=import-outside-toplevel
        db_sub_services = get_related_objects(self, ConnectionSubServiceModel, 'sub_service_fk')
        return [db_sub_service.dump_id() for db_sub_service in sorted(db_sub_services, key=operator.attrgetter('pk'))]

    def dump(self, include_path=True, include_sub_service_ids=True) -> Dict: # pylint: disable=arguments-differ
        result = {'connection_id': self.dump_id()}
        if self.service_fk is not None:
            result['service_id'] = ServiceModel(self.database, self.service_fk).dump_id()
        if include_path: result['path_hops_endpoint_ids'] = self.dump_path_hops_endpoint_ids()
        if include_sub_service_ids: result['sub_service_ids'] = self.dump_sub_service_ids()
        return result

def set_path_hop(
        database : Database, db_path : PathModel, position : int, db_endpoint : EndPointModel
    ) -> Tuple[PathHopModel, bool]:

    str_path_hop_key = key_to_str([db_path.pk, db_endpoint.pk], separator=':')
    result : Tuple[PathHopModel, bool] = update_or_create_object(database, PathHopModel, str_path_hop_key, {
        'path_fk': db_path, 'position': position, 'endpoint_fk': db_endpoint})
    db_path_hop, updated = result
    return db_path_hop, updated

def delete_path_hop(
        database : Database, db_path : PathModel, db_path_hop_pk : str
    ) -> None:

    db_path_hop : Optional[PathHopModel] = get_object(database, PathHopModel, db_path_hop_pk, raise_if_not_found=False)
    if db_path_hop is None: return
    db_path_hop.delete()

def delete_all_path_hops(
        database : Database, db_path : PathHopModel
    ) -> None:

    db_path_hop_pks = db_path.references(PathHopModel)
    for pk,_ in db_path_hop_pks: PathHopModel(database, pk).delete()

def set_path(
        database : Database, connection_uuid : str, raw_endpoint_ids : List[EndPointId], path_name : str = ''
    ) -> List[Union[PathModel, PathHopModel]]:

    str_path_key = connection_uuid if len(path_name) == 0 else key_to_str([connection_uuid, path_name], separator=':')
    result : Tuple[PathModel, bool] = get_or_create_object(database, PathModel, str_path_key)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    db_path, created = result # pylint: disable=unused-variable

    db_path_hop_pks : Set[str] = set(map(operator.itemgetter(0), db_path.references(PathHopModel)))
    db_objects : List[Tuple[Union[PathModel, PathHopModel], bool]] = [db_path]

    for position,endpoint_id in enumerate(raw_endpoint_ids):
        endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
        endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
        endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

        str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
        if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
            str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

        db_endpoint : EndPointModel = get_object(database, EndPointModel, str_endpoint_key)

        result : Tuple[PathHopModel, bool] = set_path_hop(database, db_path, position, db_endpoint)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_path_hop, updated = result # pylint: disable=unused-variable
        db_objects.append(db_path_hop)
        db_path_hop_pks.discard(db_path_hop.instance_key)

    for db_path_hop_pk in db_path_hop_pks: delete_path_hop(database, db_path, db_path_hop_pk)

    return db_objects