diff --git a/src/context/service/database/Link.py b/src/context/service/database/Link.py index d4c83c2ffc56e3993f65f8a9160c560fbcd5a496..67ac9f518f610caedc631444187cac10aded56c7 100644 --- a/src/context/service/database/Link.py +++ b/src/context/service/database/Link.py @@ -24,10 +24,10 @@ from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.object_factory.Link import json_link_id from context.service.database.uuids.Topology import topology_get_uuid from .models.LinkModel import LinkModel, LinkEndPointModel -from .models.TopologyModel import TopologyLinkModel +from .models.TopologyModel import TopologyLinkModel, TopologyModel from .uuids.EndPoint import endpoint_get_uuid from .uuids.Link import link_get_uuid -from .Events import notify_event_link +from .Events import notify_event_context, notify_event_link, notify_event_topology LOGGER = logging.getLogger(__name__) @@ -106,7 +106,7 @@ def link_set(db_engine : Engine, messagebroker : MessageBroker, request : Link) 'updated_at': now, }] - def callback(session : Session) -> bool: + def callback(session : Session) -> Tuple[bool, List[Dict]]: stmt = insert(LinkModel).values(link_data) stmt = stmt.on_conflict_do_update( index_elements=[LinkModel.link_uuid], @@ -119,34 +119,88 @@ def link_set(db_engine : Engine, messagebroker : MessageBroker, request : Link) created_at,updated_at = session.execute(stmt).fetchone() updated = updated_at > created_at + updated_endpoints = False if len(link_endpoints_data) > 0: # TODO: manage add/remove of endpoints; manage changes in relations with topology stmt = insert(LinkEndPointModel).values(link_endpoints_data) stmt = stmt.on_conflict_do_nothing( index_elements=[LinkEndPointModel.link_uuid, LinkEndPointModel.endpoint_uuid] ) - session.execute(stmt) - - if len(related_topologies) > 0: - session.execute(insert(TopologyLinkModel).values(related_topologies).on_conflict_do_nothing( + link_endpoint_inserts = session.execute(stmt) + updated_endpoints = int(link_endpoint_inserts.rowcount) > 0 + + link_topology_ids = [] + if not updated or len(related_topologies) > 1: + # Only update topology-link relations when link is created (not updated) or when endpoint_ids are + # modified (len(related_topologies) > 1). + stmt = insert(TopologyLinkModel).values(related_topologies) + stmt = stmt.on_conflict_do_nothing( index_elements=[TopologyLinkModel.topology_uuid, TopologyLinkModel.link_uuid] - )) - - return updated - - updated = run_transaction(sessionmaker(bind=db_engine), callback) + ) + stmt = stmt.returning(TopologyLinkModel.topology_uuid) + topology_uuids = session.execute(stmt).fetchall() + + LOGGER.warning('RAW topology_uuids={:s}'.format(str(topology_uuids))) + if len(topology_uuids) > 0: + topology_uuids = [topology_uuid[0] for topology_uuid in topology_uuids] + LOGGER.warning('NEW topology_uuids={:s}'.format(str(topology_uuids))) + query = session.query(TopologyModel) + query = query.filter(TopologyModel.topology_uuid.in_(topology_uuids)) + link_topologies : List[TopologyModel] = query.all() + link_topology_ids = [obj.dump_id() for obj in link_topologies] + LOGGER.warning('link_topology_ids={:s}'.format(str(link_topology_ids))) + + return updated or updated_endpoints, link_topology_ids + + updated, link_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) link_id = json_link_id(link_uuid) event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE notify_event_link(messagebroker, event_type, link_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in link_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) + + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return LinkId(**link_id) -def link_delete(db_engine : Engine, messagebroker : MessageBroker, request : LinkId) -> Tuple[Dict, bool]: +def link_delete(db_engine : Engine, messagebroker : MessageBroker, request : LinkId) -> Empty: link_uuid = link_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: + query = session.query(TopologyLinkModel) + query = query.filter_by(link_uuid=link_uuid) + topology_link_list : List[TopologyLinkModel] = query.all() + topology_ids = [obj.topology.dump_id() for obj in topology_link_list] num_deleted = session.query(LinkModel).filter_by(link_uuid=link_uuid).delete() - return num_deleted > 0 - deleted = run_transaction(sessionmaker(bind=db_engine), callback) + return num_deleted > 0, topology_ids + deleted, updated_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) link_id = json_link_id(link_uuid) if deleted: notify_event_link(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, link_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in updated_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) + + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return Empty() diff --git a/src/context/service/database/Service.py b/src/context/service/database/Service.py index 38ab7a3c5aaeb7c4b26b0677832d4618b1006f90..fc196ddded291aa82c8f9df932c15611d13121e4 100644 --- a/src/context/service/database/Service.py +++ b/src/context/service/database/Service.py @@ -33,7 +33,7 @@ from .models.ServiceModel import ServiceModel, ServiceEndPointModel from .uuids.Context import context_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid -from .Events import notify_event_service +from .Events import notify_event_context, notify_event_service LOGGER = logging.getLogger(__name__) @@ -150,9 +150,11 @@ def service_set(db_engine : Engine, messagebroker : MessageBroker, request : Ser return updated or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - service_id = json_service_id(service_uuid, json_context_id(context_uuid)) + context_id = json_context_id(context_uuid) + service_id = json_service_id(service_uuid, context_id=context_id) event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE notify_event_service(messagebroker, event_type, service_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) return ServiceId(**service_id) def service_unset(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId: @@ -203,9 +205,11 @@ def service_delete(db_engine : Engine, messagebroker : MessageBroker, request : num_deleted = session.query(ServiceModel).filter_by(service_uuid=service_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - service_id = json_service_id(service_uuid, json_context_id(context_uuid)) + context_id = json_context_id(context_uuid) + service_id = json_service_id(service_uuid, context_id=context_id) if deleted: notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, service_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) return Empty() def service_select(db_engine : Engine, request : ServiceFilter) -> ServiceList: diff --git a/src/context/service/database/Slice.py b/src/context/service/database/Slice.py index 5399e2f3f2b9814f0639553ee5f14471383e64d9..98a5ef7a8dd5d6f489c11bc2798ea16fc5b9c128 100644 --- a/src/context/service/database/Slice.py +++ b/src/context/service/database/Slice.py @@ -33,7 +33,7 @@ from .uuids.Context import context_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid from .uuids.Slice import slice_get_uuid -from .Events import notify_event_slice +from .Events import notify_event_context, notify_event_slice LOGGER = logging.getLogger(__name__) @@ -187,9 +187,11 @@ def slice_set(db_engine : Engine, messagebroker : MessageBroker, request : Slice return updated or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - slice_id = json_slice_id(slice_uuid, json_context_id(context_uuid)) + context_id = json_context_id(context_uuid) + slice_id = json_slice_id(slice_uuid, context_id=context_id) event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE notify_event_slice(messagebroker, event_type, slice_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) return SliceId(**slice_id) def slice_unset(db_engine : Engine, messagebroker : MessageBroker, request : Slice) -> SliceId: @@ -262,9 +264,11 @@ def slice_delete(db_engine : Engine, messagebroker : MessageBroker, request : Sl num_deleted = session.query(SliceModel).filter_by(slice_uuid=slice_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - slice_id = json_slice_id(slice_uuid, json_context_id(context_uuid)) + context_id = json_context_id(context_uuid) + slice_id = json_slice_id(slice_uuid, context_id=context_id) if deleted: notify_event_slice(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, slice_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) return Empty() def slice_select(db_engine : Engine, request : SliceFilter) -> SliceList: diff --git a/src/context/service/database/Topology.py b/src/context/service/database/Topology.py index d5ef80506a9f0d0ec54a5ea45f646a3d87e96ee0..1f0fb6c0b3c400d58ea83bc857e97bc50a1324a3 100644 --- a/src/context/service/database/Topology.py +++ b/src/context/service/database/Topology.py @@ -125,7 +125,7 @@ def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : To stmt = stmt.returning(TopologyModel.created_at, TopologyModel.updated_at) created_at,updated_at = session.execute(stmt).fetchone() return updated_at > created_at - + updated = run_transaction(sessionmaker(bind=db_engine), callback) context_id = json_context_id(context_uuid) topology_id = json_topology_id(topology_uuid, context_id=context_id)