Skip to content
Snippets Groups Projects
Topology.py 5.4 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import datetime, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List, Optional, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import ContextId, Topology, TopologyId
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .models.TopologyModel import TopologyModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .uuids.Context import context_get_uuid
from .uuids.Topology import topology_get_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def topology_list_ids(db_engine : Engine, request : ContextId) -> List[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = context_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> List[Dict]:
        obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all()
        return [obj.dump_id() for obj in obj_list]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return run_transaction(sessionmaker(bind=db_engine), callback)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def topology_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = context_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> List[Dict]:
        obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all()
        return [obj.dump() for obj in obj_list]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return run_transaction(sessionmaker(bind=db_engine), callback)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def topology_get(db_engine : Engine, request : TopologyId) -> Dict:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _,topology_uuid = topology_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> Optional[Dict]:
        obj : Optional[TopologyModel] = session.query(TopologyModel)\
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            .filter_by(topology_uuid=topology_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return None if obj is None else obj.dump()
    obj = run_transaction(sessionmaker(bind=db_engine), callback)
    if obj is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = context_get_uuid(request.context_id, allow_random=False)
        raw_topology_uuid = '{:s}/{:s}'.format(request.context_id.context_uuid.uuid, request.topology_uuid.uuid)
        raise NotFoundException('Topology', raw_topology_uuid, extra_details=[
            'context_uuid generated was: {:s}'.format(context_uuid),
            'topology_uuid generated was: {:s}'.format(topology_uuid),
        ])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return obj
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def topology_set(db_engine : Engine, request : Topology) -> Tuple[Dict, bool]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    topology_name = request.name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(topology_name) == 0: topology_name = request.topology_id.topology_uuid.uuid
    context_uuid,topology_uuid = topology_get_uuid(request.topology_id, topology_name=topology_name, allow_random=True)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # Ignore request.device_ids and request.link_ids. They are used for retrieving devices and links added into the
    # topology. Explicit addition into the topology is done automatically when creating the devices and links, based
    # on the topologies specified in the endpoints associated with the devices and links.

    if len(request.device_ids) > 0:   # pragma: no cover
        LOGGER.warning('Items in field "device_ids" ignored. This field is used for retrieval purposes only.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(request.link_ids) > 0:    # pragma: no cover
        LOGGER.warning('Items in field "link_ids" ignored. This field is used for retrieval purposes only.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    now = datetime.datetime.utcnow()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    topology_data = [{
        'context_uuid' : context_uuid,
        'topology_uuid': topology_uuid,
        'topology_name': topology_name,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        'created_at'   : now,
        'updated_at'   : now,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    }]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def callback(session : Session) -> None:
        stmt = insert(TopologyModel).values(topology_data)
        stmt = stmt.on_conflict_do_update(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            index_elements=[TopologyModel.topology_uuid],
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            set_=dict(
                topology_name = stmt.excluded.topology_name,
                updated_at    = stmt.excluded.updated_at,
            )
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        )
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        stmt = stmt.returning(TopologyModel.created_at, TopologyModel.updated_at)
        created_at,updated_at = session.execute(stmt).fetchone()
        return updated_at > created_at
    
    updated = run_transaction(sessionmaker(bind=db_engine), callback)
    return json_topology_id(topology_uuid, context_id=json_context_id(context_uuid)),updated
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def topology_delete(db_engine : Engine, request : TopologyId) -> Tuple[Dict, bool]:
    context_uuid,topology_uuid = topology_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> bool:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        num_deleted = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return num_deleted > 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    deleted = run_transaction(sessionmaker(bind=db_engine), callback)
    return json_topology_id(topology_uuid, context_id=json_context_id(context_uuid)),deleted