Commit 23b6f196 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- Extended Link to notify changes on topology and context when Link is modified or deleted
- Extended Link to report changes when endpoints are added.
- Extended Service to notify changes on context when Service is modified or deleted
- Extended Slice to notify changes on context when Slice is modified or deleted
- Minor cosmetic changes
parent 75985f1d
Loading
Loading
Loading
Loading
+69 −15
Original line number Diff line number Diff line
@@ -24,10 +24,10 @@ from common.method_wrappers.ServiceExceptions import NotFoundException
from common.tools.object_factory.Link import json_link_id
from context.service.database.uuids.Topology import topology_get_uuid
from .models.LinkModel import LinkModel, LinkEndPointModel
from .models.TopologyModel import TopologyLinkModel
from .models.TopologyModel import TopologyLinkModel, TopologyModel
from .uuids.EndPoint import endpoint_get_uuid
from .uuids.Link import link_get_uuid
from .Events import notify_event_link
from .Events import notify_event_context, notify_event_link, notify_event_topology

LOGGER = logging.getLogger(__name__)

@@ -106,7 +106,7 @@ def link_set(db_engine : Engine, messagebroker : MessageBroker, request : Link)
        'updated_at': now,
    }]

    def callback(session : Session) -> bool:
    def callback(session : Session) -> Tuple[bool, List[Dict]]:
        stmt = insert(LinkModel).values(link_data)
        stmt = stmt.on_conflict_do_update(
            index_elements=[LinkModel.link_uuid],
@@ -119,34 +119,88 @@ def link_set(db_engine : Engine, messagebroker : MessageBroker, request : Link)
        created_at,updated_at = session.execute(stmt).fetchone()
        updated = updated_at > created_at

        updated_endpoints = False
        if len(link_endpoints_data) > 0:
            # TODO: manage add/remove of endpoints; manage changes in relations with topology
            stmt = insert(LinkEndPointModel).values(link_endpoints_data)
            stmt = stmt.on_conflict_do_nothing(
                index_elements=[LinkEndPointModel.link_uuid, LinkEndPointModel.endpoint_uuid]
            )
            session.execute(stmt)

        if len(related_topologies) > 0:
            session.execute(insert(TopologyLinkModel).values(related_topologies).on_conflict_do_nothing(
            link_endpoint_inserts = session.execute(stmt)
            updated_endpoints = int(link_endpoint_inserts.rowcount) > 0

        link_topology_ids = []
        if not updated or len(related_topologies) > 1:
            # Only update topology-link relations when link is created (not updated) or when endpoint_ids are
            # modified (len(related_topologies) > 1).
            stmt = insert(TopologyLinkModel).values(related_topologies)
            stmt = stmt.on_conflict_do_nothing(
                index_elements=[TopologyLinkModel.topology_uuid, TopologyLinkModel.link_uuid]
            ))

        return updated

    updated = run_transaction(sessionmaker(bind=db_engine), callback)
            )
            stmt = stmt.returning(TopologyLinkModel.topology_uuid)
            topology_uuids = session.execute(stmt).fetchall()

            LOGGER.warning('RAW topology_uuids={:s}'.format(str(topology_uuids)))
            if len(topology_uuids) > 0:
                topology_uuids = [topology_uuid[0] for topology_uuid in topology_uuids]
                LOGGER.warning('NEW topology_uuids={:s}'.format(str(topology_uuids)))
                query = session.query(TopologyModel)
                query = query.filter(TopologyModel.topology_uuid.in_(topology_uuids))
                link_topologies : List[TopologyModel] = query.all()
                link_topology_ids = [obj.dump_id() for obj in link_topologies]
                LOGGER.warning('link_topology_ids={:s}'.format(str(link_topology_ids)))

        return updated or updated_endpoints, link_topology_ids

    updated, link_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback)
    link_id = json_link_id(link_uuid)
    event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    notify_event_link(messagebroker, event_type, link_id)

    context_ids  : Dict[str, Dict] = dict()
    topology_ids : Dict[str, Dict] = dict()
    for topology_id in link_topology_ids:
        topology_uuid = topology_id['topology_uuid']['uuid']
        topology_ids[topology_uuid] = topology_id
        context_id = topology_id['context_id']
        context_uuid = context_id['context_uuid']['uuid']
        context_ids[context_uuid] = context_id

    for topology_id in topology_ids.values():
        notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id)

    for context_id in context_ids.values():
        notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)

    return LinkId(**link_id)

def link_delete(db_engine : Engine, messagebroker : MessageBroker, request : LinkId) -> Tuple[Dict, bool]:
def link_delete(db_engine : Engine, messagebroker : MessageBroker, request : LinkId) -> Empty:
    link_uuid = link_get_uuid(request, allow_random=False)
    def callback(session : Session) -> bool:
        query = session.query(TopologyLinkModel)
        query = query.filter_by(link_uuid=link_uuid)
        topology_link_list : List[TopologyLinkModel] = query.all()
        topology_ids = [obj.topology.dump_id() for obj in topology_link_list]
        num_deleted = session.query(LinkModel).filter_by(link_uuid=link_uuid).delete()
        return num_deleted > 0
    deleted = run_transaction(sessionmaker(bind=db_engine), callback)
        return num_deleted > 0, topology_ids
    deleted, updated_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback)
    link_id = json_link_id(link_uuid)
    if deleted:
        notify_event_link(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, link_id)

        context_ids  : Dict[str, Dict] = dict()
        topology_ids : Dict[str, Dict] = dict()
        for topology_id in updated_topology_ids:
            topology_uuid = topology_id['topology_uuid']['uuid']
            topology_ids[topology_uuid] = topology_id
            context_id = topology_id['context_id']
            context_uuid = context_id['context_uuid']['uuid']
            context_ids[context_uuid] = context_id

        for topology_id in topology_ids.values():
            notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id)

        for context_id in context_ids.values():
            notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)

    return Empty()
+7 −3
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ from .models.ServiceModel import ServiceModel, ServiceEndPointModel
from .uuids.Context import context_get_uuid
from .uuids.EndPoint import endpoint_get_uuid
from .uuids.Service import service_get_uuid
from .Events import notify_event_service
from .Events import notify_event_context, notify_event_service

LOGGER = logging.getLogger(__name__)

@@ -150,9 +150,11 @@ def service_set(db_engine : Engine, messagebroker : MessageBroker, request : Ser
        return updated or changed_constraints or changed_config_rules

    updated = run_transaction(sessionmaker(bind=db_engine), callback)
    service_id = json_service_id(service_uuid, json_context_id(context_uuid))
    context_id = json_context_id(context_uuid)
    service_id = json_service_id(service_uuid, context_id=context_id)
    event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    notify_event_service(messagebroker, event_type, service_id)
    notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
    return ServiceId(**service_id)

def service_unset(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId:
@@ -203,9 +205,11 @@ def service_delete(db_engine : Engine, messagebroker : MessageBroker, request :
        num_deleted = session.query(ServiceModel).filter_by(service_uuid=service_uuid).delete()
        return num_deleted > 0
    deleted = run_transaction(sessionmaker(bind=db_engine), callback)
    service_id = json_service_id(service_uuid, json_context_id(context_uuid))
    context_id = json_context_id(context_uuid)
    service_id = json_service_id(service_uuid, context_id=context_id)
    if deleted:
        notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, service_id)
        notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
    return Empty()

def service_select(db_engine : Engine, request : ServiceFilter) -> ServiceList:
+7 −3
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ from .uuids.Context import context_get_uuid
from .uuids.EndPoint import endpoint_get_uuid
from .uuids.Service import service_get_uuid
from .uuids.Slice import slice_get_uuid
from .Events import notify_event_slice
from .Events import notify_event_context, notify_event_slice

LOGGER = logging.getLogger(__name__)

@@ -187,9 +187,11 @@ def slice_set(db_engine : Engine, messagebroker : MessageBroker, request : Slice
        return updated or changed_constraints or changed_config_rules

    updated = run_transaction(sessionmaker(bind=db_engine), callback)
    slice_id = json_slice_id(slice_uuid, json_context_id(context_uuid))
    context_id = json_context_id(context_uuid)
    slice_id = json_slice_id(slice_uuid, context_id=context_id)
    event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    notify_event_slice(messagebroker, event_type, slice_id)
    notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
    return SliceId(**slice_id)

def slice_unset(db_engine : Engine, messagebroker : MessageBroker, request : Slice) -> SliceId:
@@ -262,9 +264,11 @@ def slice_delete(db_engine : Engine, messagebroker : MessageBroker, request : Sl
        num_deleted = session.query(SliceModel).filter_by(slice_uuid=slice_uuid).delete()
        return num_deleted > 0
    deleted = run_transaction(sessionmaker(bind=db_engine), callback)
    slice_id = json_slice_id(slice_uuid, json_context_id(context_uuid))
    context_id = json_context_id(context_uuid)
    slice_id = json_slice_id(slice_uuid, context_id=context_id)
    if deleted:
        notify_event_slice(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, slice_id)
        notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
    return Empty()

def slice_select(db_engine : Engine, request : SliceFilter) -> SliceList:
+1 −1

File changed.

Contains only whitespace changes.