diff --git a/src/common/message_broker/backend/nats/NatsBackendThread.py b/src/common/message_broker/backend/nats/NatsBackendThread.py index b8a4fcd7bb6c40b1c7ff837b6b1a19523c115500..3ac32b0cb8f7b4be2d693753e39919b82ab3948f 100644 --- a/src/common/message_broker/backend/nats/NatsBackendThread.py +++ b/src/common/message_broker/backend/nats/NatsBackendThread.py @@ -52,22 +52,26 @@ class NatsBackendThread(threading.Thread): self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event, ready_event : threading.Event ) -> None: - LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) - client = await nats.connect(servers=[self._nats_uri]) - LOGGER.info('[_run_subscriber] Connected!') - subscription = await client.subscribe(topic_name) - LOGGER.info('[_run_subscriber] Subscribed!') - ready_event.set() - while not self._terminate.is_set() and not unsubscribe.is_set(): - try: - message = await subscription.next_msg(timeout) - except nats.errors.TimeoutError: - continue - except asyncio.CancelledError: - break - out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) - await subscription.unsubscribe() - await client.drain() + try: + LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) + client = await nats.connect(servers=[self._nats_uri]) + server_version = client.connected_server_version + LOGGER.info('[_run_subscriber] Connected! NATS Server version: {:s}'.format(str(repr(server_version)))) + subscription = await client.subscribe(topic_name) + LOGGER.info('[_run_subscriber] Subscribed!') + ready_event.set() + while not self._terminate.is_set() and not unsubscribe.is_set(): + try: + message = await subscription.next_msg(timeout) + except nats.errors.TimeoutError: + continue + except asyncio.CancelledError: + break + out_queue.put(Message(message.subject, message.data.decode('UTF-8'))) + await subscription.unsubscribe() + await client.drain() + except Exception: # pylint: disable=broad-exception-caught + LOGGER.exception('[_run_subscriber] Unhandled Exception') def subscribe( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event @@ -79,7 +83,7 @@ class NatsBackendThread(threading.Thread): self._tasks.append(task) LOGGER.info('[subscribe] Waiting for subscriber to be ready...') is_ready = ready_event.wait(timeout=120) - LOGGER.info('[subscribe] Subscriber Ready: {:s}'.format(str(is_ready))) + LOGGER.info('[subscribe] Subscriber is Ready? {:s}'.format(str(is_ready))) def run(self) -> None: asyncio.set_event_loop(self._event_loop) diff --git a/src/common/tests/MockMessageBroker.py b/src/common/tests/MockMessageBroker.py index 2eeea74cfc85fd360180c77a2d5b7387d2ef092f..27613a64d3f63a55276bb1c1f82fdb6c20a8e534 100644 --- a/src/common/tests/MockMessageBroker.py +++ b/src/common/tests/MockMessageBroker.py @@ -28,10 +28,6 @@ TOPIC_SERVICE = 'service' TOPIC_SLICE = 'slice' TOPIC_TOPOLOGY = 'topology' -TOPICS = { - TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY -} - CONSUME_TIMEOUT = 0.5 # seconds class Message(NamedTuple): diff --git a/src/common/tools/context_queries/Topology.py b/src/common/tools/context_queries/Topology.py index 15217b8d14fec0137d94aa704e3cd8cb096f4a17..caf03ed0eb5271aa6e00a2c107a06f9e496d37dc 100644 --- a/src/common/tools/context_queries/Topology.py +++ b/src/common/tools/context_queries/Topology.py @@ -15,7 +15,7 @@ import grpc, logging from typing import List, Optional from common.Constants import DEFAULT_CONTEXT_NAME -from common.proto.context_pb2 import ContextId, Topology, TopologyId +from common.proto.context_pb2 import ContextId, Topology, TopologyDetails, TopologyId from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Topology import json_topology from context.client.ContextClient import ContextClient @@ -23,13 +23,13 @@ from context.client.ContextClient import ContextClient LOGGER = logging.getLogger(__name__) def create_topology( - context_client : ContextClient, context_uuid : str, topology_uuid : str + context_client : ContextClient, context_uuid : str, topology_uuid : str, name : Optional[str] = None ) -> None: context_id = ContextId(**json_context_id(context_uuid)) existing_topology_ids = context_client.ListTopologyIds(context_id) existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} if topology_uuid in existing_topology_uuids: return - context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id))) + context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id, name=name))) def create_missing_topologies( context_client : ContextClient, context_id : ContextId, topology_uuids : List[str] @@ -61,3 +61,21 @@ def get_topology( except grpc.RpcError: #LOGGER.exception('Unable to get topology({:s} / {:s})'.format(str(context_uuid), str(topology_uuid))) return None + +def get_topology_details( + context_client : ContextClient, topology_uuid : str, context_uuid : str = DEFAULT_CONTEXT_NAME, + rw_copy : bool = False + ) -> Optional[Topology]: + try: + # pylint: disable=no-member + topology_id = TopologyId() + topology_id.context_id.context_uuid.uuid = context_uuid + topology_id.topology_uuid.uuid = topology_uuid + ro_topology_details = context_client.GetTopologyDetails(topology_id) + if not rw_copy: return ro_topology_details + rw_topology_details = TopologyDetails() + rw_topology_details.CopyFrom(ro_topology_details) + return rw_topology_details + except grpc.RpcError: + #LOGGER.exception('Unable to get topology({:s} / {:s})'.format(str(context_uuid), str(topology_uuid))) + return None diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index 6d540b4945df8516697c957316294a452186ddb1..93f078e75545c93a2cd312cf48e8f64cdeea87ac 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, json, logging, sqlalchemy +import grpc, logging, sqlalchemy from typing import Iterator from common.message_broker.MessageBroker import MessageBroker from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, - Empty, EndPointIdList, EndPointNameList, EventTypeEnum, + Empty, EndPointIdList, EndPointNameList, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, @@ -33,16 +33,16 @@ from .database.Connection import ( from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_select, device_set from .database.EndPoint import endpoint_list_names +from .database.Events import EventTopicEnum, consume_events from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set from .database.PolicyRule import ( policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set) -from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set, service_unset -from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset +from .database.Service import ( + service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set, service_unset) +from .database.Slice import ( + slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset) from .database.Topology import ( topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set) -from .Events import ( - CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, - TOPIC_SLICE, TOPIC_TOPOLOGY, notify_event) LOGGER = logging.getLogger(__name__) @@ -62,308 +62,237 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList: - return ContextIdList(context_ids=context_list_ids(self.db_engine)) + return context_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList: - return ContextList(contexts=context_list_objs(self.db_engine)) + return context_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context: - return Context(**context_get(self.db_engine, request)) + return context_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId: - context_id,updated = context_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id}) - return ContextId(**context_id) + return context_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty: - context_id,deleted = context_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id}) - return Empty() + return context_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: - for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): - yield ContextEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.CONTEXT}): yield message # ----- Topology --------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList: - return TopologyIdList(topology_ids=topology_list_ids(self.db_engine, request)) + return topology_list_ids(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList: - return TopologyList(topologies=topology_list_objs(self.db_engine, request)) + return topology_list_objs(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology: - return Topology(**topology_get(self.db_engine, request)) + return topology_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTopologyDetails(self, request : TopologyId, context : grpc.ServicerContext) -> TopologyDetails: - return TopologyDetails(**topology_get_details(self.db_engine, request)) + return topology_get_details(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId: - topology_id,updated = topology_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id}) - return TopologyId(**topology_id) + return topology_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: - topology_id,deleted = topology_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id}) - return Empty() + return topology_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: - for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): - yield TopologyEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.TOPOLOGY}): yield message # ----- Device ----------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList: - return DeviceIdList(device_ids=device_list_ids(self.db_engine)) + return device_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList: - return DeviceList(devices=device_list_objs(self.db_engine)) + return device_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device: - return Device(**device_get(self.db_engine, request)) + return device_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: - device_id,updated = device_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) - return DeviceId(**device_id) + return device_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: - device_id,deleted = device_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) - return Empty() + return device_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectDevice(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList: - return DeviceList(devices=device_select(self.db_engine, request)) + return device_select(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: - for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): - yield DeviceEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.DEVICE}): yield message @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListEndPointNames(self, request : EndPointIdList, context : grpc.ServicerContext) -> EndPointNameList: - return EndPointNameList(endpoint_names=endpoint_list_names(self.db_engine, request)) + return endpoint_list_names(self.db_engine, request) # ----- Link ------------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: - return LinkIdList(link_ids=link_list_ids(self.db_engine)) + return link_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: - return LinkList(links=link_list_objs(self.db_engine)) + return link_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: - return Link(**link_get(self.db_engine, request)) + return link_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: - link_id,updated = link_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id}) - return LinkId(**link_id) + return link_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: - link_id,deleted = link_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id}) - return Empty() + return link_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: - for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): - yield LinkEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.LINK}): yield message # ----- Service ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList: - return ServiceIdList(service_ids=service_list_ids(self.db_engine, request)) + return service_list_ids(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: - return ServiceList(services=service_list_objs(self.db_engine, request)) + return service_list_objs(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service: - return Service(**service_get(self.db_engine, request)) + return service_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: - service_id,updated = service_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) - return ServiceId(**service_id) + return service_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def UnsetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: - service_id,updated = service_unset(self.db_engine, request) - if updated: - event_type = EventTypeEnum.EVENTTYPE_UPDATE - notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) - return ServiceId(**service_id) + return service_unset(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: - service_id,deleted = service_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) - return Empty() + return service_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList: - return ServiceList(services=service_select(self.db_engine, request)) + return service_select(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: - for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): - yield ServiceEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.SERVICE}): yield message # ----- Slice ---------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList: - return SliceIdList(slice_ids=slice_list_ids(self.db_engine, request)) + return slice_list_ids(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList: - return SliceList(slices=slice_list_objs(self.db_engine, request)) + return slice_list_objs(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice: - return Slice(**slice_get(self.db_engine, request)) + return slice_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: - slice_id,updated = slice_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) - return SliceId(**slice_id) + return slice_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: - slice_id,updated = slice_unset(self.db_engine, request) - if updated: - event_type = EventTypeEnum.EVENTTYPE_UPDATE - notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) - return SliceId(**slice_id) + return slice_unset(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: - slice_id,deleted = slice_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) - return Empty() + return slice_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList: - return SliceList(slices=slice_select(self.db_engine, request)) + return slice_select(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: - for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): - yield SliceEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.SLICE}): yield message # ----- Connection ------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: - return ConnectionIdList(connection_ids=connection_list_ids(self.db_engine, request)) + return connection_list_ids(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList: - return ConnectionList(connections=connection_list_objs(self.db_engine, request)) + return connection_list_objs(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection: - return Connection(**connection_get(self.db_engine, request)) + return connection_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId: - connection_id,updated = connection_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id}) - return ConnectionId(**connection_id) + return connection_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty: - connection_id,deleted = connection_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id}) - return Empty() + return connection_delete(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: - for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): - yield ConnectionEvent(**json.loads(message.content)) + for message in consume_events(self.messagebroker, {EventTopicEnum.CONNECTION}): yield message - # ----- Policy ----------------------------------------------------------------------------------------------------- + # ----- Policy Rule ------------------------------------------------------------------------------------------------ @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: - return PolicyRuleIdList(policyRuleIdList=policyrule_list_ids(self.db_engine)) + return policyrule_list_ids(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList: - return PolicyRuleList(policyRules=policyrule_list_objs(self.db_engine)) + return policyrule_list_objs(self.db_engine) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: - return PolicyRule(**policyrule_get(self.db_engine, request)) + return policyrule_get(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: - policyrule_id,updated = policyrule_set(self.db_engine, request) - event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE - notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id}) - return PolicyRuleId(**policyrule_id) + return policyrule_set(self.db_engine, self.messagebroker, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty: - policyrule_id,deleted = policyrule_delete(self.db_engine, request) - if deleted: - event_type = EventTypeEnum.EVENTTYPE_REMOVE - notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id}) - return Empty() + return policyrule_delete(self.db_engine, self.messagebroker, request) diff --git a/src/context/service/Events.py b/src/context/service/Events.py deleted file mode 100644 index 5d20f144c93385c769dbd8526cb10b8088eee728..0000000000000000000000000000000000000000 --- a/src/context/service/Events.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json, time -from typing import Dict -from common.message_broker.Message import Message -from common.message_broker.MessageBroker import MessageBroker -from common.proto.context_pb2 import EventTypeEnum - -TOPIC_CONNECTION = 'connection' -TOPIC_CONTEXT = 'context' -TOPIC_DEVICE = 'device' -TOPIC_LINK = 'link' -TOPIC_POLICY = 'policy' -TOPIC_SERVICE = 'service' -TOPIC_SLICE = 'slice' -TOPIC_TOPOLOGY = 'topology' - -TOPICS = { - TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY -} - -CONSUME_TIMEOUT = 0.5 # seconds - -def notify_event( - messagebroker : MessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str] -) -> None: - event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}} - for field_name, field_value in fields.items(): - event[field_name] = field_value - messagebroker.publish(Message(topic_name, json.dumps(event))) diff --git a/src/context/service/database/Connection.py b/src/context/service/database/Connection.py index 80d3b3a6d437986741ee5308205d8a902e897c40..0a659f662c5ca4116211b7939afc3f5fe39b835c 100644 --- a/src/context/service/database/Connection.py +++ b/src/context/service/database/Connection.py @@ -19,7 +19,9 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple -from common.proto.context_pb2 import Connection, ConnectionId, ServiceId +from common.proto.context_pb2 import ( + Connection, ConnectionId, ConnectionIdList, ConnectionList, Empty, EventTypeEnum, ServiceId) +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Connection import json_connection_id @@ -27,17 +29,19 @@ from .models.ConnectionModel import ConnectionEndPointModel, ConnectionModel, Co from .uuids.Connection import connection_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid +from .Events import notify_event_connection LOGGER = logging.getLogger(__name__) -def connection_list_ids(db_engine : Engine, request : ServiceId) -> List[Dict]: +def connection_list_ids(db_engine : Engine, request : ServiceId) -> ConnectionIdList: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ConnectionModel] = session.query(ConnectionModel).filter_by(service_uuid=service_uuid).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + connection_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return ConnectionIdList(connection_ids=connection_ids) -def connection_list_objs(db_engine : Engine, request : ServiceId) -> List[Dict]: +def connection_list_objs(db_engine : Engine, request : ServiceId) -> ConnectionList: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ConnectionModel] = session.query(ConnectionModel)\ @@ -46,9 +50,10 @@ def connection_list_objs(db_engine : Engine, request : ServiceId) -> List[Dict]: .options(selectinload(ConnectionModel.connection_subservices))\ .filter_by(service_uuid=service_uuid).all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + connections = run_transaction(sessionmaker(bind=db_engine), callback) + return ConnectionList(connections=connections) -def connection_get(db_engine : Engine, request : ConnectionId) -> Dict: +def connection_get(db_engine : Engine, request : ConnectionId) -> Connection: connection_uuid = connection_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ConnectionModel] = session.query(ConnectionModel)\ @@ -62,9 +67,9 @@ def connection_get(db_engine : Engine, request : ConnectionId) -> Dict: raise NotFoundException('Connection', request.connection_uuid.uuid, extra_details=[ 'connection_uuid generated was: {:s}'.format(connection_uuid), ]) - return obj + return Connection(**obj) -def connection_set(db_engine : Engine, request : Connection) -> Tuple[Dict, bool]: +def connection_set(db_engine : Engine, messagebroker : MessageBroker, request : Connection) -> ConnectionId: connection_uuid = connection_get_uuid(request.connection_id, allow_random=True) _,service_uuid = service_get_uuid(request.service_id, allow_random=False) settings = grpc_message_to_json_string(request.settings), @@ -143,12 +148,18 @@ def connection_set(db_engine : Engine, request : Connection) -> Tuple[Dict, bool return updated updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_connection_id(connection_uuid),updated + connection_id = json_connection_id(connection_uuid) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_connection(messagebroker, event_type, connection_id) + return ConnectionId(**connection_id) -def connection_delete(db_engine : Engine, request : ConnectionId) -> Tuple[Dict, bool]: +def connection_delete(db_engine : Engine, messagebroker : MessageBroker, request : ConnectionId) -> Tuple[Dict, bool]: connection_uuid = connection_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(ConnectionModel).filter_by(connection_uuid=connection_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_connection_id(connection_uuid),deleted + connection_id = json_connection_id(connection_uuid) + if deleted: + notify_event_connection(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, connection_id) + return Empty() diff --git a/src/context/service/database/Context.py b/src/context/service/database/Context.py index 4654095034749e1de985705b242ba9fa05a82f6a..403dcd2320d0ce925a95e92331a634785cfa2289 100644 --- a/src/context/service/database/Context.py +++ b/src/context/service/database/Context.py @@ -17,22 +17,25 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction -from typing import Dict, List, Optional, Tuple -from common.proto.context_pb2 import Context, ContextId +from typing import Dict, List, Optional +from common.proto.context_pb2 import Context, ContextId, ContextIdList, ContextList, Empty, EventTypeEnum +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.object_factory.Context import json_context_id from .models.ContextModel import ContextModel from .uuids.Context import context_get_uuid +from .Events import notify_event_context LOGGER = logging.getLogger(__name__) -def context_list_ids(db_engine : Engine) -> List[Dict]: +def context_list_ids(db_engine : Engine) -> ContextIdList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + context_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return ContextIdList(context_ids=context_ids) -def context_list_objs(db_engine : Engine) -> List[Dict]: +def context_list_objs(db_engine : Engine) -> ContextList: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel)\ .options(selectinload(ContextModel.topologies))\ @@ -40,9 +43,10 @@ def context_list_objs(db_engine : Engine) -> List[Dict]: .options(selectinload(ContextModel.slices))\ .all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + contexts = run_transaction(sessionmaker(bind=db_engine), callback) + return ContextList(contexts=contexts) -def context_get(db_engine : Engine, request : ContextId) -> Dict: +def context_get(db_engine : Engine, request : ContextId) -> Context: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ContextModel] = session.query(ContextModel)\ @@ -57,9 +61,9 @@ def context_get(db_engine : Engine, request : ContextId) -> Dict: raise NotFoundException('Context', raw_context_uuid, extra_details=[ 'context_uuid generated was: {:s}'.format(context_uuid) ]) - return obj + return Context(**obj) -def context_set(db_engine : Engine, request : Context) -> Tuple[Dict, bool]: +def context_set(db_engine : Engine, messagebroker : MessageBroker, request : Context) -> ContextId: context_name = request.name if len(context_name) == 0: context_name = request.context_id.context_uuid.uuid context_uuid = context_get_uuid(request.context_id, context_name=context_name, allow_random=True) @@ -100,12 +104,18 @@ def context_set(db_engine : Engine, request : Context) -> Tuple[Dict, bool]: return updated_at > created_at updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_context_id(context_uuid),updated + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + context_id = json_context_id(context_uuid) + notify_event_context(messagebroker, event_type, context_id) + return ContextId(**context_id) -def context_delete(db_engine : Engine, request : ContextId) -> Tuple[Dict, bool]: +def context_delete(db_engine : Engine, messagebroker : MessageBroker, request : ContextId) -> Empty: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_context_id(context_uuid),deleted + context_id = json_context_id(context_uuid) + if deleted: + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, context_id) + return Empty() diff --git a/src/context/service/database/Device.py b/src/context/service/database/Device.py index 8560399cc705729685cbaa7c10399a0ec7589015..df57791adf7faf04cb24c372d82ce547f35b9c72 100644 --- a/src/context/service/database/Device.py +++ b/src/context/service/database/Device.py @@ -19,29 +19,33 @@ from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Set, Tuple from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException -from common.proto.context_pb2 import Device, DeviceFilter, DeviceId, TopologyId +from common.message_broker.MessageBroker import MessageBroker +from common.proto.context_pb2 import ( + Device, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, TopologyId) from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Device import json_device_id from context.service.database.uuids.Topology import topology_get_uuid from .models.DeviceModel import DeviceModel from .models.EndPointModel import EndPointModel -from .models.TopologyModel import TopologyDeviceModel +from .models.TopologyModel import TopologyDeviceModel, TopologyModel from .models.enums.DeviceDriver import grpc_to_enum__device_driver from .models.enums.DeviceOperationalStatus import grpc_to_enum__device_operational_status from .models.enums.KpiSampleType import grpc_to_enum__kpi_sample_type from .uuids.Device import device_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .ConfigRule import compose_config_rules_data, upsert_config_rules +from .Events import notify_event_context, notify_event_device, notify_event_topology LOGGER = logging.getLogger(__name__) -def device_list_ids(db_engine : Engine) -> List[Dict]: +def device_list_ids(db_engine : Engine) -> DeviceIdList: def callback(session : Session) -> List[Dict]: obj_list : List[DeviceModel] = session.query(DeviceModel).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + device_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return DeviceIdList(device_ids=device_ids) -def device_list_objs(db_engine : Engine) -> List[Dict]: +def device_list_objs(db_engine : Engine) -> DeviceList: def callback(session : Session) -> List[Dict]: obj_list : List[DeviceModel] = session.query(DeviceModel)\ .options(selectinload(DeviceModel.endpoints))\ @@ -49,9 +53,10 @@ def device_list_objs(db_engine : Engine) -> List[Dict]: .all() #.options(selectinload(DeviceModel.components))\ return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + devices = run_transaction(sessionmaker(bind=db_engine), callback) + return DeviceList(devices=devices) -def device_get(db_engine : Engine, request : DeviceId) -> Dict: +def device_get(db_engine : Engine, request : DeviceId) -> Device: device_uuid = device_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[DeviceModel] = session.query(DeviceModel)\ @@ -66,9 +71,9 @@ def device_get(db_engine : Engine, request : DeviceId) -> Dict: raise NotFoundException('Device', raw_device_uuid, extra_details=[ 'device_uuid generated was: {:s}'.format(device_uuid) ]) - return obj + return Device(**obj) -def device_set(db_engine : Engine, request : Device) -> Tuple[Dict, bool]: +def device_set(db_engine : Engine, messagebroker : MessageBroker, request : Device) -> DeviceId: raw_device_uuid = request.device_id.device_uuid.uuid raw_device_name = request.name device_name = raw_device_uuid if len(raw_device_name) == 0 else raw_device_name @@ -148,7 +153,7 @@ def device_set(db_engine : Engine, request : Device) -> Tuple[Dict, bool]: if controller_uuid is not None: device_data[0]['controller_uuid'] = controller_uuid - def callback(session : Session) -> bool: + def callback(session : Session) -> Tuple[bool, List[Dict]]: stmt = insert(DeviceModel).values(device_data) stmt = stmt.on_conflict_do_update( index_elements=[DeviceModel.device_uuid], @@ -180,29 +185,85 @@ def device_set(db_engine : Engine, request : Device) -> Tuple[Dict, bool]: endpoint_updates = session.execute(stmt).fetchall() updated_endpoints = any([(updated_at > created_at) for created_at,updated_at in endpoint_updates]) + device_topology_ids = [] if not updated or len(related_topologies) > 1: # Only update topology-device relations when device is created (not updated) or when endpoints are # modified (len(related_topologies) > 1). - session.execute(insert(TopologyDeviceModel).values(related_topologies).on_conflict_do_nothing( + stmt = insert(TopologyDeviceModel).values(related_topologies) + stmt = stmt.on_conflict_do_nothing( index_elements=[TopologyDeviceModel.topology_uuid, TopologyDeviceModel.device_uuid] - )) + ) + stmt = stmt.returning(TopologyDeviceModel.topology_uuid) + topology_uuids = session.execute(stmt).fetchall() + + LOGGER.warning('RAW topology_uuids={:s}'.format(str(topology_uuids))) + if len(topology_uuids) > 0: + topology_uuids = [topology_uuid[0] for topology_uuid in topology_uuids] + LOGGER.warning('NEW topology_uuids={:s}'.format(str(topology_uuids))) + query = session.query(TopologyModel) + query = query.filter(TopologyModel.topology_uuid.in_(topology_uuids)) + device_topologies : List[TopologyModel] = query.all() + device_topology_ids = [obj.dump_id() for obj in device_topologies] + LOGGER.warning('device_topology_ids={:s}'.format(str(device_topology_ids))) changed_config_rules = upsert_config_rules(session, config_rules, device_uuid=device_uuid) - return updated or updated_endpoints or changed_config_rules + return updated or updated_endpoints or changed_config_rules, device_topology_ids + + updated, device_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) + device_id = json_device_id(device_uuid) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_device(messagebroker, event_type, device_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in device_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) - updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_device_id(device_uuid),updated + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) -def device_delete(db_engine : Engine, request : DeviceId) -> Tuple[Dict, bool]: + return DeviceId(**device_id) + +def device_delete(db_engine : Engine, messagebroker : MessageBroker, request : DeviceId) -> Empty: device_uuid = device_get_uuid(request, allow_random=False) - def callback(session : Session) -> bool: + def callback(session : Session) -> Tuple[bool, List[Dict]]: + query = session.query(TopologyDeviceModel) + query = query.filter_by(device_uuid=device_uuid) + topology_device_list : List[TopologyDeviceModel] = query.all() + topology_ids = [obj.topology.dump_id() for obj in topology_device_list] num_deleted = session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete() - return num_deleted > 0 - deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_device_id(device_uuid),deleted + return num_deleted > 0, topology_ids + deleted, updated_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) + device_id = json_device_id(device_uuid) + if deleted: + notify_event_device(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, device_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in updated_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) + + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + + return Empty() -def device_select(db_engine : Engine, request : DeviceFilter) -> List[Dict]: +def device_select(db_engine : Engine, request : DeviceFilter) -> DeviceList: device_uuids = [ device_get_uuid(device_id, allow_random=False) for device_id in request.device_ids.device_ids @@ -219,4 +280,5 @@ def device_select(db_engine : Engine, request : DeviceFilter) -> List[Dict]: #if request.include_components : query = query.options(selectinload(DeviceModel.components)) obj_list : List[DeviceModel] = query.filter(DeviceModel.device_uuid.in_(device_uuids)).all() return [obj.dump(**dump_params) for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + devices = run_transaction(sessionmaker(bind=db_engine), callback) + return DeviceList(devices=devices) \ No newline at end of file diff --git a/src/context/service/database/EndPoint.py b/src/context/service/database/EndPoint.py index b0df3bb8101a7b64a148e916178b1c9a77d511af..d7445b951dbd2d846900c21799e2fc03164ae6c5 100644 --- a/src/context/service/database/EndPoint.py +++ b/src/context/service/database/EndPoint.py @@ -17,13 +17,13 @@ from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List -from common.proto.context_pb2 import EndPointIdList +from common.proto.context_pb2 import EndPointIdList, EndPointNameList from .models.EndPointModel import EndPointModel from .uuids.EndPoint import endpoint_get_uuid LOGGER = logging.getLogger(__name__) -def endpoint_list_names(db_engine : Engine, request : EndPointIdList) -> List[Dict]: +def endpoint_list_names(db_engine : Engine, request : EndPointIdList) -> EndPointNameList: endpoint_uuids = { endpoint_get_uuid(endpoint_id, allow_random=False)[-1] for endpoint_id in request.endpoint_ids @@ -33,4 +33,5 @@ def endpoint_list_names(db_engine : Engine, request : EndPointIdList) -> List[Di .options(selectinload(EndPointModel.device))\ .filter(EndPointModel.endpoint_uuid.in_(endpoint_uuids)).all() return [obj.dump_name() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + endpoint_names = run_transaction(sessionmaker(bind=db_engine), callback) + return EndPointNameList(endpoint_names=endpoint_names) diff --git a/src/context/service/database/Events.py b/src/context/service/database/Events.py new file mode 100644 index 0000000000000000000000000000000000000000..36774a5170ba20914555b0adc47a5c2faa592799 --- /dev/null +++ b/src/context/service/database/Events.py @@ -0,0 +1,89 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum, json, logging, time +from typing import Dict, Iterator, Set +from common.message_broker.Message import Message +from common.message_broker.MessageBroker import MessageBroker +from common.proto.context_pb2 import ( + ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, LinkEvent, ServiceEvent, SliceEvent, TopologyEvent) + +class EventTopicEnum(enum.Enum): + CONNECTION = 'connection' + CONTEXT = 'context' + DEVICE = 'device' + LINK = 'link' + POLICY_RULE = 'policy-rule' + SERVICE = 'service' + SLICE = 'slice' + TOPOLOGY = 'topology' + +TOPIC_TO_EVENTCLASS = { + EventTopicEnum.CONNECTION.value : ConnectionEvent, + EventTopicEnum.CONTEXT.value : ContextEvent, + EventTopicEnum.DEVICE.value : DeviceEvent, + EventTopicEnum.LINK.value : LinkEvent, + #EventTopicEnum.POLICY_RULE.value : PolicyRuleEvent, # Not defined in proto files + EventTopicEnum.SERVICE.value : ServiceEvent, + EventTopicEnum.SLICE.value : SliceEvent, + EventTopicEnum.TOPOLOGY.value : TopologyEvent, +} + +CONSUME_TIMEOUT = 0.5 # seconds + +LOGGER = logging.getLogger(__name__) + +def notify_event( + messagebroker : MessageBroker, topic_enum : EventTopicEnum, event_type : EventTypeEnum, fields : Dict[str, str] +) -> None: + event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}} + for field_name, field_value in fields.items(): + event[field_name] = field_value + messagebroker.publish(Message(topic_enum.value, json.dumps(event))) + +def notify_event_context(messagebroker : MessageBroker, event_type : EventTypeEnum, context_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.CONTEXT, event_type, {'context_id': context_id}) + +def notify_event_topology(messagebroker : MessageBroker, event_type : EventTypeEnum, topology_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.TOPOLOGY, event_type, {'topology_id': topology_id}) + +def notify_event_device(messagebroker : MessageBroker, event_type : EventTypeEnum, device_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.DEVICE, event_type, {'device_id': device_id}) + +def notify_event_link(messagebroker : MessageBroker, event_type : EventTypeEnum, link_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.LINK, event_type, {'link_id': link_id}) + +def notify_event_service(messagebroker : MessageBroker, event_type : EventTypeEnum, service_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.SERVICE, event_type, {'service_id': service_id}) + +def notify_event_slice(messagebroker : MessageBroker, event_type : EventTypeEnum, slice_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.SLICE, event_type, {'slice_id': slice_id}) + +def notify_event_connection(messagebroker : MessageBroker, event_type : EventTypeEnum, connection_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.CONNECTION, event_type, {'connection_id': connection_id}) + +def notify_event_policy_rule(messagebroker : MessageBroker, event_type : EventTypeEnum, policyrule_id : Dict) -> None: + notify_event(messagebroker, EventTopicEnum.POLICY_RULE, event_type, {'policyrule_id': policyrule_id}) + +def consume_events( + messagebroker : MessageBroker, topic_enums : Set[EventTopicEnum], consume_timeout : float = CONSUME_TIMEOUT +) -> Iterator: + topic_names = [topic_enum.value for topic_enum in topic_enums] + for message in messagebroker.consume(topic_names, consume_timeout=consume_timeout): + event_class = TOPIC_TO_EVENTCLASS.get(message.topic) + if event_class is None: + MSG = 'No EventClass defined for Topic({:s}). Ignoring...' + LOGGER.warning(MSG.format(str(message.topic))) + continue + yield event_class(**json.loads(message.content)) diff --git a/src/context/service/database/Link.py b/src/context/service/database/Link.py index 76db07a9e30b4f62c4b51574ad95c222a1490f79..67ac9f518f610caedc631444187cac10aded56c7 100644 --- a/src/context/service/database/Link.py +++ b/src/context/service/database/Link.py @@ -18,32 +18,36 @@ from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Set, Tuple -from common.proto.context_pb2 import Link, LinkId, TopologyId +from common.proto.context_pb2 import Empty, EventTypeEnum, Link, LinkId, LinkIdList, LinkList, TopologyId +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.object_factory.Link import json_link_id from context.service.database.uuids.Topology import topology_get_uuid from .models.LinkModel import LinkModel, LinkEndPointModel -from .models.TopologyModel import TopologyLinkModel +from .models.TopologyModel import TopologyLinkModel, TopologyModel from .uuids.EndPoint import endpoint_get_uuid from .uuids.Link import link_get_uuid +from .Events import notify_event_context, notify_event_link, notify_event_topology LOGGER = logging.getLogger(__name__) -def link_list_ids(db_engine : Engine) -> List[Dict]: +def link_list_ids(db_engine : Engine) -> LinkIdList: def callback(session : Session) -> List[Dict]: obj_list : List[LinkModel] = session.query(LinkModel).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + link_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return LinkIdList(link_ids=link_ids) -def link_list_objs(db_engine : Engine) -> List[Dict]: +def link_list_objs(db_engine : Engine) -> LinkList: def callback(session : Session) -> List[Dict]: obj_list : List[LinkModel] = session.query(LinkModel)\ .options(selectinload(LinkModel.link_endpoints))\ .all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + links = run_transaction(sessionmaker(bind=db_engine), callback) + return LinkList(links=links) -def link_get(db_engine : Engine, request : LinkId) -> Dict: +def link_get(db_engine : Engine, request : LinkId) -> Link: link_uuid = link_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[LinkModel] = session.query(LinkModel)\ @@ -56,9 +60,9 @@ def link_get(db_engine : Engine, request : LinkId) -> Dict: raise NotFoundException('Link', raw_link_uuid, extra_details=[ 'link_uuid generated was: {:s}'.format(link_uuid) ]) - return obj + return Link(**obj) -def link_set(db_engine : Engine, request : Link) -> Tuple[Dict, bool]: +def link_set(db_engine : Engine, messagebroker : MessageBroker, request : Link) -> LinkId: raw_link_uuid = request.link_id.link_uuid.uuid raw_link_name = request.name link_name = raw_link_uuid if len(raw_link_name) == 0 else raw_link_name @@ -102,7 +106,7 @@ def link_set(db_engine : Engine, request : Link) -> Tuple[Dict, bool]: 'updated_at': now, }] - def callback(session : Session) -> bool: + def callback(session : Session) -> Tuple[bool, List[Dict]]: stmt = insert(LinkModel).values(link_data) stmt = stmt.on_conflict_do_update( index_elements=[LinkModel.link_uuid], @@ -115,28 +119,88 @@ def link_set(db_engine : Engine, request : Link) -> Tuple[Dict, bool]: created_at,updated_at = session.execute(stmt).fetchone() updated = updated_at > created_at + updated_endpoints = False if len(link_endpoints_data) > 0: # TODO: manage add/remove of endpoints; manage changes in relations with topology stmt = insert(LinkEndPointModel).values(link_endpoints_data) stmt = stmt.on_conflict_do_nothing( index_elements=[LinkEndPointModel.link_uuid, LinkEndPointModel.endpoint_uuid] ) - session.execute(stmt) - - if len(related_topologies) > 0: - session.execute(insert(TopologyLinkModel).values(related_topologies).on_conflict_do_nothing( + link_endpoint_inserts = session.execute(stmt) + updated_endpoints = int(link_endpoint_inserts.rowcount) > 0 + + link_topology_ids = [] + if not updated or len(related_topologies) > 1: + # Only update topology-link relations when link is created (not updated) or when endpoint_ids are + # modified (len(related_topologies) > 1). + stmt = insert(TopologyLinkModel).values(related_topologies) + stmt = stmt.on_conflict_do_nothing( index_elements=[TopologyLinkModel.topology_uuid, TopologyLinkModel.link_uuid] - )) - - return updated - - updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_link_id(link_uuid),updated - -def link_delete(db_engine : Engine, request : LinkId) -> Tuple[Dict, bool]: + ) + stmt = stmt.returning(TopologyLinkModel.topology_uuid) + topology_uuids = session.execute(stmt).fetchall() + + LOGGER.warning('RAW topology_uuids={:s}'.format(str(topology_uuids))) + if len(topology_uuids) > 0: + topology_uuids = [topology_uuid[0] for topology_uuid in topology_uuids] + LOGGER.warning('NEW topology_uuids={:s}'.format(str(topology_uuids))) + query = session.query(TopologyModel) + query = query.filter(TopologyModel.topology_uuid.in_(topology_uuids)) + link_topologies : List[TopologyModel] = query.all() + link_topology_ids = [obj.dump_id() for obj in link_topologies] + LOGGER.warning('link_topology_ids={:s}'.format(str(link_topology_ids))) + + return updated or updated_endpoints, link_topology_ids + + updated, link_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) + link_id = json_link_id(link_uuid) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_link(messagebroker, event_type, link_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in link_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) + + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + + return LinkId(**link_id) + +def link_delete(db_engine : Engine, messagebroker : MessageBroker, request : LinkId) -> Empty: link_uuid = link_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: + query = session.query(TopologyLinkModel) + query = query.filter_by(link_uuid=link_uuid) + topology_link_list : List[TopologyLinkModel] = query.all() + topology_ids = [obj.topology.dump_id() for obj in topology_link_list] num_deleted = session.query(LinkModel).filter_by(link_uuid=link_uuid).delete() - return num_deleted > 0 - deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_link_id(link_uuid),deleted + return num_deleted > 0, topology_ids + deleted, updated_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) + link_id = json_link_id(link_uuid) + if deleted: + notify_event_link(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, link_id) + + context_ids : Dict[str, Dict] = dict() + topology_ids : Dict[str, Dict] = dict() + for topology_id in updated_topology_ids: + topology_uuid = topology_id['topology_uuid']['uuid'] + topology_ids[topology_uuid] = topology_id + context_id = topology_id['context_id'] + context_uuid = context_id['context_uuid']['uuid'] + context_ids[context_uuid] = context_id + + for topology_id in topology_ids.values(): + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id) + + for context_id in context_ids.values(): + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + + return Empty() diff --git a/src/context/service/database/PolicyRule.py b/src/context/service/database/PolicyRule.py index 13f0a2698c17874e1e15f4d6a1d527d366141f56..3db0696a49cd851608d34797ce138ea0e63a1c51 100644 --- a/src/context/service/database/PolicyRule.py +++ b/src/context/service/database/PolicyRule.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime, json +import datetime, json, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction -from typing import Dict, List, Optional, Set, Tuple +from typing import Dict, List, Optional, Set +from common.proto.context_pb2 import Empty, EventTypeEnum from common.proto.policy_pb2 import PolicyRule, PolicyRuleId, PolicyRuleIdList, PolicyRuleList +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.PolicyRule import json_policyrule_id @@ -27,21 +29,26 @@ from .models.enums.PolicyRuleState import grpc_to_enum__policyrule_state from .models.PolicyRuleModel import PolicyRuleDeviceModel, PolicyRuleKindEnum, PolicyRuleModel from .uuids.PolicuRule import policyrule_get_uuid from .uuids.Service import service_get_uuid +from .Events import notify_event_policy_rule -def policyrule_list_ids(db_engine : Engine) -> List[Dict]: +LOGGER = logging.getLogger(__name__) + +def policyrule_list_ids(db_engine : Engine) -> PolicyRuleIdList: def callback(session : Session) -> List[Dict]: obj_list : List[PolicyRuleModel] = session.query(PolicyRuleModel).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + policy_rule_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return PolicyRuleIdList(policyRuleIdList=policy_rule_ids) -def policyrule_list_objs(db_engine : Engine) -> List[Dict]: +def policyrule_list_objs(db_engine : Engine) -> PolicyRuleList: def callback(session : Session) -> List[Dict]: obj_list : List[PolicyRuleModel] = session.query(PolicyRuleModel)\ .options(selectinload(PolicyRuleModel.policyrule_service))\ .options(selectinload(PolicyRuleModel.policyrule_devices))\ .all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + policy_rules = run_transaction(sessionmaker(bind=db_engine), callback) + return PolicyRuleList(policyRules=policy_rules) def policyrule_get(db_engine : Engine, request : PolicyRuleId) -> PolicyRule: policyrule_uuid = policyrule_get_uuid(request, allow_random=False) @@ -57,9 +64,9 @@ def policyrule_get(db_engine : Engine, request : PolicyRuleId) -> PolicyRule: raise NotFoundException('PolicyRule', raw_policyrule_uuid, extra_details=[ 'policyrule_uuid generated was: {:s}'.format(policyrule_uuid) ]) - return obj + return PolicyRule(**obj) -def policyrule_set(db_engine : Engine, request : PolicyRule) -> Tuple[PolicyRuleId, bool]: +def policyrule_set(db_engine : Engine, messagebroker : MessageBroker, request : PolicyRule) -> PolicyRuleId: policyrule_kind = request.WhichOneof('policy_rule') policyrule_spec = getattr(request, policyrule_kind) policyrule_basic = policyrule_spec.policyRuleBasic @@ -130,12 +137,18 @@ def policyrule_set(db_engine : Engine, request : PolicyRule) -> Tuple[PolicyRule return updated updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_policyrule_id(policyrule_uuid),updated + policyrule_id = json_policyrule_id(policyrule_uuid) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_policy_rule(messagebroker, event_type, policyrule_id) + return PolicyRuleId(**policyrule_id) -def policyrule_delete(db_engine : Engine, request : PolicyRuleId) -> Tuple[Dict, bool]: +def policyrule_delete(db_engine : Engine, messagebroker : MessageBroker, request : PolicyRuleId) -> Empty: policyrule_uuid = policyrule_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(PolicyRuleModel).filter_by(policyrule_uuid=policyrule_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_policyrule_id(policyrule_uuid),deleted + policyrule_id = json_policyrule_id(policyrule_uuid) + if deleted: + notify_event_policy_rule(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, policyrule_id) + return Empty() diff --git a/src/context/service/database/Service.py b/src/context/service/database/Service.py index b6916dc3a19fef4bde3aff93300e63f360b362c0..fc196ddded291aa82c8f9df932c15611d13121e4 100644 --- a/src/context/service/database/Service.py +++ b/src/context/service/database/Service.py @@ -18,8 +18,10 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction -from typing import Dict, List, Optional, Set, Tuple -from common.proto.context_pb2 import ContextId, Service, ServiceFilter, ServiceId +from typing import Dict, List, Optional, Set +from common.proto.context_pb2 import ( + ContextId, Empty, EventTypeEnum, Service, ServiceFilter, ServiceId, ServiceIdList, ServiceList) +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Service import json_service_id @@ -31,17 +33,19 @@ from .models.ServiceModel import ServiceModel, ServiceEndPointModel from .uuids.Context import context_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid +from .Events import notify_event_context, notify_event_service LOGGER = logging.getLogger(__name__) -def service_list_ids(db_engine : Engine, request : ContextId) -> List[Dict]: +def service_list_ids(db_engine : Engine, request : ContextId) -> ServiceIdList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + service_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return ServiceIdList(service_ids=service_ids) -def service_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: +def service_list_objs(db_engine : Engine, request : ContextId) -> ServiceList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ServiceModel] = session.query(ServiceModel)\ @@ -50,9 +54,10 @@ def service_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: .options(selectinload(ServiceModel.config_rules))\ .filter_by(context_uuid=context_uuid).all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + services = run_transaction(sessionmaker(bind=db_engine), callback) + return ServiceList(services=services) -def service_get(db_engine : Engine, request : ServiceId) -> Dict: +def service_get(db_engine : Engine, request : ServiceId) -> Service: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ServiceModel] = session.query(ServiceModel)\ @@ -69,9 +74,9 @@ def service_get(db_engine : Engine, request : ServiceId) -> Dict: 'context_uuid generated was: {:s}'.format(context_uuid), 'service_uuid generated was: {:s}'.format(service_uuid), ]) - return obj + return Service(**obj) -def service_set(db_engine : Engine, request : Service) -> Tuple[Dict, bool]: +def service_set(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId: raw_context_uuid = request.service_id.context_id.context_uuid.uuid raw_service_uuid = request.service_id.service_uuid.uuid raw_service_name = request.name @@ -145,9 +150,14 @@ def service_set(db_engine : Engine, request : Service) -> Tuple[Dict, bool]: return updated or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_service_id(service_uuid, json_context_id(context_uuid)),updated - -def service_unset(db_engine : Engine, request : Service) -> Tuple[Dict, bool]: + context_id = json_context_id(context_uuid) + service_id = json_service_id(service_uuid, context_id=context_id) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_service(messagebroker, event_type, service_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return ServiceId(**service_id) + +def service_unset(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId: raw_context_uuid = request.service_id.context_id.context_uuid.uuid raw_service_uuid = request.service_id.service_uuid.uuid raw_service_name = request.name @@ -184,17 +194,25 @@ def service_unset(db_engine : Engine, request : Service) -> Tuple[Dict, bool]: return num_deletes > 0 or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_service_id(service_uuid, json_context_id(context_uuid)),updated + service_id = json_service_id(service_uuid, json_context_id(context_uuid)) + if updated: + notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, service_id) + return ServiceId(**service_id) -def service_delete(db_engine : Engine, request : ServiceId) -> Tuple[Dict, bool]: +def service_delete(db_engine : Engine, messagebroker : MessageBroker, request : ServiceId) -> Empty: context_uuid,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(ServiceModel).filter_by(service_uuid=service_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_service_id(service_uuid, json_context_id(context_uuid)),deleted - -def service_select(db_engine : Engine, request : ServiceFilter) -> List[Dict]: + context_id = json_context_id(context_uuid) + service_id = json_service_id(service_uuid, context_id=context_id) + if deleted: + notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, service_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return Empty() + +def service_select(db_engine : Engine, request : ServiceFilter) -> ServiceList: service_uuids = [ service_get_uuid(service_id, allow_random=False)[1] for service_id in request.service_ids.service_ids @@ -211,4 +229,5 @@ def service_select(db_engine : Engine, request : ServiceFilter) -> List[Dict]: if request.include_config_rules: query = query.options(selectinload(ServiceModel.config_rules)) obj_list : List[ServiceModel] = query.filter(ServiceModel.service_uuid.in_(service_uuids)).all() return [obj.dump(**dump_params) for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + services = run_transaction(sessionmaker(bind=db_engine), callback) + return ServiceList(services=services) diff --git a/src/context/service/database/Slice.py b/src/context/service/database/Slice.py index abd140024f2a13289c7af6a3bafe363a8247e053..98a5ef7a8dd5d6f489c11bc2798ea16fc5b9c128 100644 --- a/src/context/service/database/Slice.py +++ b/src/context/service/database/Slice.py @@ -18,8 +18,10 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction -from typing import Dict, List, Optional, Set, Tuple -from common.proto.context_pb2 import ContextId, Slice, SliceFilter, SliceId +from typing import Dict, List, Optional, Set +from common.proto.context_pb2 import ( + ContextId, Empty, EventTypeEnum, Slice, SliceFilter, SliceId, SliceIdList, SliceList) +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Slice import json_slice_id @@ -31,17 +33,19 @@ from .uuids.Context import context_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .uuids.Service import service_get_uuid from .uuids.Slice import slice_get_uuid +from .Events import notify_event_context, notify_event_slice LOGGER = logging.getLogger(__name__) -def slice_list_ids(db_engine : Engine, request : ContextId) -> List[Dict]: +def slice_list_ids(db_engine : Engine, request : ContextId) -> SliceIdList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[SliceModel] = session.query(SliceModel).filter_by(context_uuid=context_uuid).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + slice_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return SliceIdList(slice_ids=slice_ids) -def slice_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: +def slice_list_objs(db_engine : Engine, request : ContextId) -> SliceList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[SliceModel] = session.query(SliceModel)\ @@ -52,9 +56,10 @@ def slice_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: .options(selectinload(SliceModel.config_rules))\ .filter_by(context_uuid=context_uuid).all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + slices = run_transaction(sessionmaker(bind=db_engine), callback) + return SliceList(slices=slices) -def slice_get(db_engine : Engine, request : SliceId) -> Dict: +def slice_get(db_engine : Engine, request : SliceId) -> Slice: _,slice_uuid = slice_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[SliceModel] = session.query(SliceModel)\ @@ -73,9 +78,9 @@ def slice_get(db_engine : Engine, request : SliceId) -> Dict: 'context_uuid generated was: {:s}'.format(context_uuid), 'slice_uuid generated was: {:s}'.format(slice_uuid), ]) - return obj + return Slice(**obj) -def slice_set(db_engine : Engine, request : Slice) -> Tuple[Dict, bool]: +def slice_set(db_engine : Engine, messagebroker : MessageBroker, request : Slice) -> SliceId: raw_context_uuid = request.slice_id.context_id.context_uuid.uuid raw_slice_uuid = request.slice_id.slice_uuid.uuid raw_slice_name = request.name @@ -182,9 +187,14 @@ def slice_set(db_engine : Engine, request : Slice) -> Tuple[Dict, bool]: return updated or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_slice_id(slice_uuid, json_context_id(context_uuid)),updated - -def slice_unset(db_engine : Engine, request : Slice) -> Tuple[Dict, bool]: + context_id = json_context_id(context_uuid) + slice_id = json_slice_id(slice_uuid, context_id=context_id) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_slice(messagebroker, event_type, slice_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return SliceId(**slice_id) + +def slice_unset(db_engine : Engine, messagebroker : MessageBroker, request : Slice) -> SliceId: raw_context_uuid = request.slice_id.context_id.context_uuid.uuid raw_slice_uuid = request.slice_id.slice_uuid.uuid raw_slice_name = request.name @@ -243,17 +253,25 @@ def slice_unset(db_engine : Engine, request : Slice) -> Tuple[Dict, bool]: return num_deletes > 0 or changed_constraints or changed_config_rules updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_slice_id(slice_uuid, json_context_id(context_uuid)),updated + slice_id = json_slice_id(slice_uuid, json_context_id(context_uuid)) + if updated: + notify_event_slice(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, slice_id) + return SliceId(**slice_id) -def slice_delete(db_engine : Engine, request : SliceId) -> Tuple[Dict, bool]: +def slice_delete(db_engine : Engine, messagebroker : MessageBroker, request : SliceId) -> Empty: context_uuid,slice_uuid = slice_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(SliceModel).filter_by(slice_uuid=slice_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_slice_id(slice_uuid, json_context_id(context_uuid)),deleted - -def slice_select(db_engine : Engine, request : SliceFilter) -> List[Dict]: + context_id = json_context_id(context_uuid) + slice_id = json_slice_id(slice_uuid, context_id=context_id) + if deleted: + notify_event_slice(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, slice_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return Empty() + +def slice_select(db_engine : Engine, request : SliceFilter) -> SliceList: slice_uuids = [ slice_get_uuid(slice_id, allow_random=False)[1] for slice_id in request.slice_ids.slice_ids @@ -274,4 +292,5 @@ def slice_select(db_engine : Engine, request : SliceFilter) -> List[Dict]: if request.include_config_rules: query = query.options(selectinload(SliceModel.config_rules)) obj_list : List[SliceModel] = query.filter(SliceModel.slice_uuid.in_(slice_uuids)).all() return [obj.dump(**dump_params) for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + slices = run_transaction(sessionmaker(bind=db_engine), callback) + return SliceList(slices=slices) diff --git a/src/context/service/database/Topology.py b/src/context/service/database/Topology.py index 4440299b63f68613854e79998270872389d385cb..1f0fb6c0b3c400d58ea83bc857e97bc50a1324a3 100644 --- a/src/context/service/database/Topology.py +++ b/src/context/service/database/Topology.py @@ -17,8 +17,10 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction -from typing import Dict, List, Optional, Tuple -from common.proto.context_pb2 import ContextId, Topology, TopologyId +from typing import Dict, List, Optional +from common.proto.context_pb2 import ( + ContextId, Empty, EventTypeEnum, Topology, TopologyDetails, TopologyId, TopologyIdList, TopologyList) +from common.message_broker.MessageBroker import MessageBroker from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Topology import json_topology_id @@ -27,17 +29,19 @@ from .models.LinkModel import LinkModel from .models.TopologyModel import TopologyDeviceModel, TopologyLinkModel, TopologyModel from .uuids.Context import context_get_uuid from .uuids.Topology import topology_get_uuid +from .Events import notify_event_context, notify_event_topology LOGGER = logging.getLogger(__name__) -def topology_list_ids(db_engine : Engine, request : ContextId) -> List[Dict]: +def topology_list_ids(db_engine : Engine, request : ContextId) -> TopologyIdList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all() return [obj.dump_id() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + topology_ids = run_transaction(sessionmaker(bind=db_engine), callback) + return TopologyIdList(topology_ids=topology_ids) -def topology_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: +def topology_list_objs(db_engine : Engine, request : ContextId) -> TopologyList: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[TopologyModel] = session.query(TopologyModel)\ @@ -45,9 +49,10 @@ def topology_list_objs(db_engine : Engine, request : ContextId) -> List[Dict]: .options(selectinload(TopologyModel.topology_links))\ .filter_by(context_uuid=context_uuid).all() return [obj.dump() for obj in obj_list] - return run_transaction(sessionmaker(bind=db_engine), callback) + topologies = run_transaction(sessionmaker(bind=db_engine), callback) + return TopologyList(topologies=topologies) -def topology_get(db_engine : Engine, request : TopologyId) -> Dict: +def topology_get(db_engine : Engine, request : TopologyId) -> Topology: _,topology_uuid = topology_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[TopologyModel] = session.query(TopologyModel)\ @@ -63,9 +68,9 @@ def topology_get(db_engine : Engine, request : TopologyId) -> Dict: 'context_uuid generated was: {:s}'.format(context_uuid), 'topology_uuid generated was: {:s}'.format(topology_uuid), ]) - return obj + return Topology(**obj) -def topology_get_details(db_engine : Engine, request : TopologyId) -> Dict: +def topology_get_details(db_engine : Engine, request : TopologyId) -> TopologyDetails: _,topology_uuid = topology_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[TopologyModel] = session.query(TopologyModel)\ @@ -82,9 +87,9 @@ def topology_get_details(db_engine : Engine, request : TopologyId) -> Dict: 'context_uuid generated was: {:s}'.format(context_uuid), 'topology_uuid generated was: {:s}'.format(topology_uuid), ]) - return obj + return TopologyDetails(**obj) -def topology_set(db_engine : Engine, request : Topology) -> Tuple[Dict, bool]: +def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : Topology) -> TopologyId: topology_name = request.name if len(topology_name) == 0: topology_name = request.topology_id.topology_uuid.uuid context_uuid,topology_uuid = topology_get_uuid(request.topology_id, topology_name=topology_name, allow_random=True) @@ -120,14 +125,24 @@ def topology_set(db_engine : Engine, request : Topology) -> Tuple[Dict, bool]: stmt = stmt.returning(TopologyModel.created_at, TopologyModel.updated_at) created_at,updated_at = session.execute(stmt).fetchone() return updated_at > created_at - + updated = run_transaction(sessionmaker(bind=db_engine), callback) - return json_topology_id(topology_uuid, context_id=json_context_id(context_uuid)),updated + context_id = json_context_id(context_uuid) + topology_id = json_topology_id(topology_uuid, context_id=context_id) + event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE + notify_event_topology(messagebroker, event_type, topology_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return TopologyId(**topology_id) -def topology_delete(db_engine : Engine, request : TopologyId) -> Tuple[Dict, bool]: +def topology_delete(db_engine : Engine, messagebroker : MessageBroker, request : TopologyId) -> Empty: context_uuid,topology_uuid = topology_get_uuid(request, allow_random=False) def callback(session : Session) -> bool: num_deleted = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) - return json_topology_id(topology_uuid, context_id=json_context_id(context_uuid)),deleted + context_id = json_context_id(context_uuid) + topology_id = json_topology_id(topology_uuid, context_id=context_id) + if deleted: + notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, topology_id) + notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id) + return Empty() diff --git a/src/context/service/database/models/TopologyModel.py b/src/context/service/database/models/TopologyModel.py index 0ed4a038bcf4426f4cf112bd03c5cb36cb42c822..68d97edf36ee42c04948cc6782b86bce028cb76a 100644 --- a/src/context/service/database/models/TopologyModel.py +++ b/src/context/service/database/models/TopologyModel.py @@ -67,7 +67,7 @@ class TopologyDeviceModel(_Base): topology_uuid = Column(ForeignKey('topology.topology_uuid', ondelete='RESTRICT'), primary_key=True, index=True) device_uuid = Column(ForeignKey('device.device_uuid', ondelete='CASCADE' ), primary_key=True, index=True) - #topology = relationship('TopologyModel', lazy='selectin') # back_populates='topology_devices' + topology = relationship('TopologyModel', lazy='selectin', viewonly=True) # back_populates='topology_devices' device = relationship('DeviceModel', lazy='selectin') # back_populates='topology_devices' class TopologyLinkModel(_Base): @@ -76,5 +76,5 @@ class TopologyLinkModel(_Base): topology_uuid = Column(ForeignKey('topology.topology_uuid', ondelete='RESTRICT'), primary_key=True, index=True) link_uuid = Column(ForeignKey('link.link_uuid', ondelete='CASCADE' ), primary_key=True, index=True) - #topology = relationship('TopologyModel', lazy='selectin') # back_populates='topology_links' + topology = relationship('TopologyModel', lazy='selectin', viewonly=True) # back_populates='topology_links' link = relationship('LinkModel', lazy='selectin') # back_populates='topology_links'