Skip to content
Snippets Groups Projects
Context.py 4.75 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List, Optional, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import Context, ContextId, ContextIdList, ContextList
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Context import json_context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .models.ContextModel import ContextModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .uuids.Context import context_get_uuid

LOGGER = logging.getLogger(__name__)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def context_list_ids(db_engine : Engine) -> ContextIdList:
    def callback(session : Session) -> List[Dict]:
        obj_list : List[ContextModel] = session.query(ContextModel).all()
        #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
        return [obj.dump_id() for obj in obj_list]
    return ContextIdList(context_ids=run_transaction(sessionmaker(bind=db_engine), callback))

def context_list_objs(db_engine : Engine) -> ContextList:
    def callback(session : Session) -> List[Dict]:
        obj_list : List[ContextModel] = session.query(ContextModel).all()
        #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
        return [obj.dump() for obj in obj_list]
    return ContextList(contexts=run_transaction(sessionmaker(bind=db_engine), callback))

def context_get(db_engine : Engine, request : ContextId) -> Context:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = context_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> Optional[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        obj : Optional[ContextModel] = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return None if obj is None else obj.dump()
    obj = run_transaction(sessionmaker(bind=db_engine), callback)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if obj is None:
        raw_context_uuid = request.context_uuid.uuid
        raise NotFoundException('Context', raw_context_uuid, extra_details=[
            'context_uuid generated was: {:s}'.format(context_uuid)
        ])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    return Context(**obj)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_set(db_engine : Engine, request : Context) -> Tuple[ContextId, bool]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_name = request.name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(context_name) == 0: context_name = request.context_id.context_uuid.uuid
    context_uuid = context_get_uuid(request.context_id, context_name=context_name, allow_random=True)

    # Ignore request.topology_ids, request.service_ids, and request.slice_ids. They are used
    # for retrieving topologies, services and slices added into the context. Explicit addition
    # into the context is done automatically qhen creating the topology, service or slice
    # specifying the associated context.

    if len(request.topology_ids) > 0:   # pragma: no cover
        LOGGER.warning('Items in field "topology_ids" ignored. This field is used for retrieval purposes only.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(request.service_ids) > 0:    # pragma: no cover
        LOGGER.warning('Items in field "service_ids" ignored. This field is used for retrieval purposes only.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    if len(request.slice_ids) > 0:      # pragma: no cover
        LOGGER.warning('Items in field "slice_ids" ignored. This field is used for retrieval purposes only.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_data = [{
        'context_uuid': context_uuid,
        'context_name': context_name,
    }]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def callback(session : Session) -> None:
        stmt = insert(ContextModel).values(context_data)
        stmt = stmt.on_conflict_do_update(
            index_elements=[ContextModel.context_uuid],
            set_=dict(context_name = stmt.excluded.context_name)
        )
        session.execute(stmt)

    run_transaction(sessionmaker(bind=db_engine), callback)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    updated = False # TODO: improve and check if created/updated
    return ContextId(**json_context_id(context_uuid)),updated
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def context_delete(db_engine : Engine, request : ContextId) -> bool:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_uuid = context_get_uuid(request, allow_random=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def callback(session : Session) -> bool:
        num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
        return num_deleted > 0
    return run_transaction(sessionmaker(bind=db_engine), callback)