Loading proto/context.proto +26 −1 Original line number Diff line number Diff line Loading @@ -40,7 +40,7 @@ service ContextService { rpc SetDevice (Device ) returns ( DeviceId ) {} rpc RemoveDevice (DeviceId ) returns ( Empty ) {} rpc GetDeviceEvents (Empty ) returns (stream DeviceEvent ) {} rpc SelectDevice (DeviceFilter ) returns ( DeviceList ) {} rpc ListEndPointNames (EndPointIdList) returns ( EndPointNameList) {} rpc ListLinkIds (Empty ) returns ( LinkIdList ) {} Loading @@ -57,6 +57,7 @@ service ContextService { rpc UnsetService (Service ) returns ( ServiceId ) {} rpc RemoveService (ServiceId ) returns ( Empty ) {} rpc GetServiceEvents (Empty ) returns (stream ServiceEvent ) {} rpc SelectService (ServiceFilter ) returns ( ServiceList ) {} rpc ListSliceIds (ContextId ) returns ( SliceIdList ) {} rpc ListSlices (ContextId ) returns ( SliceList ) {} Loading @@ -65,6 +66,7 @@ service ContextService { rpc UnsetSlice (Slice ) returns ( SliceId ) {} rpc RemoveSlice (SliceId ) returns ( Empty ) {} rpc GetSliceEvents (Empty ) returns (stream SliceEvent ) {} rpc SelectSlice (SliceFilter ) returns ( SliceList ) {} rpc ListConnectionIds (ServiceId ) returns ( ConnectionIdList) {} rpc ListConnections (ServiceId ) returns ( ConnectionList ) {} Loading Loading @@ -208,6 +210,13 @@ message DeviceList { repeated Device devices = 1; } message DeviceFilter { DeviceIdList device_ids = 1; bool include_endpoints = 2; bool include_config_rules = 3; bool include_components = 4; } message DeviceEvent { Event event = 1; DeviceId device_id = 2; Loading Loading @@ -288,6 +297,13 @@ message ServiceList { repeated Service services = 1; } message ServiceFilter { ServiceIdList service_ids = 1; bool include_endpoint_ids = 2; bool include_constraints = 3; bool include_config_rules = 4; } message ServiceEvent { Event event = 1; ServiceId service_id = 2; Loading Loading @@ -342,6 +358,15 @@ message SliceList { repeated Slice slices = 1; } message SliceFilter { SliceIdList slice_ids = 1; bool include_endpoint_ids = 2; bool include_constraints = 3; bool include_service_ids = 4; bool include_subslice_ids = 5; bool include_config_rules = 6; } message SliceEvent { Event event = 1; SliceId slice_id = 2; Loading src/context/client/ContextClient.py +24 −3 Original line number Diff line number Diff line Loading @@ -21,11 +21,11 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty, EndPointIdList, EndPointNameList, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub Loading Loading @@ -185,6 +185,13 @@ class ContextClient: LOGGER.debug('RemoveDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectDevice(self, request: DeviceFilter) -> DeviceList: LOGGER.debug('SelectDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectDevice(request) LOGGER.debug('SelectDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetDeviceEvents(self, request: Empty) -> Iterator[DeviceEvent]: LOGGER.debug('GetDeviceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading Loading @@ -283,6 +290,13 @@ class ContextClient: LOGGER.debug('RemoveService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectService(self, request: ServiceFilter) -> ServiceList: LOGGER.debug('SelectService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectService(request) LOGGER.debug('SelectService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetServiceEvents(self, request: Empty) -> Iterator[ServiceEvent]: LOGGER.debug('GetServiceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading Loading @@ -332,6 +346,13 @@ class ContextClient: LOGGER.debug('RemoveSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectSlice(self, request: SliceFilter) -> SliceList: LOGGER.debug('SelectSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectSlice(request) LOGGER.debug('SelectSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetSliceEvents(self, request: Empty) -> Iterator[SliceEvent]: LOGGER.debug('GetSliceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading src/context/service/ContextServiceServicerImpl.py +18 −6 Original line number Diff line number Diff line Loading @@ -18,11 +18,11 @@ from common.message_broker.MessageBroker import MessageBroker from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty, EndPointIdList, EndPointNameList, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer Loading @@ -31,13 +31,13 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from .database.Connection import ( connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set) from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_select, device_set from .database.EndPoint import endpoint_list_names from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set from .database.PolicyRule import ( policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set) from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset from .database.Topology import ( topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set) from .Events import ( Loading Loading @@ -161,6 +161,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectDevices(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList: return DeviceList(devices=device_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): Loading Loading @@ -235,6 +239,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList: return ServiceList(services=service_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): Loading Loading @@ -278,6 +286,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList: return SliceList(slices=slice_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): Loading src/context/service/database/Connection.py +9 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ import datetime, logging, re from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import Connection, ConnectionId, ServiceId Loading @@ -40,7 +40,11 @@ def connection_list_ids(db_engine : Engine, request : ServiceId) -> List[Dict]: def connection_list_objs(db_engine : Engine, request : ServiceId) -> List[Dict]: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ConnectionModel] = session.query(ConnectionModel).filter_by(service_uuid=service_uuid).all() obj_list : List[ConnectionModel] = session.query(ConnectionModel)\ .options(selectinload(ConnectionModel.connection_service))\ .options(selectinload(ConnectionModel.connection_endpoints))\ .options(selectinload(ConnectionModel.connection_subservices))\ .filter_by(service_uuid=service_uuid).all() return [obj.dump() for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback) Loading @@ -48,6 +52,9 @@ def connection_get(db_engine : Engine, request : ConnectionId) -> Dict: connection_uuid = connection_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ConnectionModel] = session.query(ConnectionModel)\ .options(selectinload(ConnectionModel.connection_service))\ .options(selectinload(ConnectionModel.connection_endpoints))\ .options(selectinload(ConnectionModel.connection_subservices))\ .filter_by(connection_uuid=connection_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) Loading src/context/service/database/Context.py +11 −3 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ import datetime, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import Context, ContextId Loading @@ -34,14 +34,22 @@ def context_list_ids(db_engine : Engine) -> List[Dict]: def context_list_objs(db_engine : Engine) -> List[Dict]: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() obj_list : List[ContextModel] = session.query(ContextModel)\ .options(selectinload(ContextModel.topologies))\ .options(selectinload(ContextModel.services))\ .options(selectinload(ContextModel.slices))\ .all() return [obj.dump() for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback) def context_get(db_engine : Engine, request : ContextId) -> Dict: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ContextModel] = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() obj : Optional[ContextModel] = session.query(ContextModel)\ .options(selectinload(ContextModel.topologies))\ .options(selectinload(ContextModel.services))\ .options(selectinload(ContextModel.slices))\ .filter_by(context_uuid=context_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) if obj is None: Loading Loading
proto/context.proto +26 −1 Original line number Diff line number Diff line Loading @@ -40,7 +40,7 @@ service ContextService { rpc SetDevice (Device ) returns ( DeviceId ) {} rpc RemoveDevice (DeviceId ) returns ( Empty ) {} rpc GetDeviceEvents (Empty ) returns (stream DeviceEvent ) {} rpc SelectDevice (DeviceFilter ) returns ( DeviceList ) {} rpc ListEndPointNames (EndPointIdList) returns ( EndPointNameList) {} rpc ListLinkIds (Empty ) returns ( LinkIdList ) {} Loading @@ -57,6 +57,7 @@ service ContextService { rpc UnsetService (Service ) returns ( ServiceId ) {} rpc RemoveService (ServiceId ) returns ( Empty ) {} rpc GetServiceEvents (Empty ) returns (stream ServiceEvent ) {} rpc SelectService (ServiceFilter ) returns ( ServiceList ) {} rpc ListSliceIds (ContextId ) returns ( SliceIdList ) {} rpc ListSlices (ContextId ) returns ( SliceList ) {} Loading @@ -65,6 +66,7 @@ service ContextService { rpc UnsetSlice (Slice ) returns ( SliceId ) {} rpc RemoveSlice (SliceId ) returns ( Empty ) {} rpc GetSliceEvents (Empty ) returns (stream SliceEvent ) {} rpc SelectSlice (SliceFilter ) returns ( SliceList ) {} rpc ListConnectionIds (ServiceId ) returns ( ConnectionIdList) {} rpc ListConnections (ServiceId ) returns ( ConnectionList ) {} Loading Loading @@ -208,6 +210,13 @@ message DeviceList { repeated Device devices = 1; } message DeviceFilter { DeviceIdList device_ids = 1; bool include_endpoints = 2; bool include_config_rules = 3; bool include_components = 4; } message DeviceEvent { Event event = 1; DeviceId device_id = 2; Loading Loading @@ -288,6 +297,13 @@ message ServiceList { repeated Service services = 1; } message ServiceFilter { ServiceIdList service_ids = 1; bool include_endpoint_ids = 2; bool include_constraints = 3; bool include_config_rules = 4; } message ServiceEvent { Event event = 1; ServiceId service_id = 2; Loading Loading @@ -342,6 +358,15 @@ message SliceList { repeated Slice slices = 1; } message SliceFilter { SliceIdList slice_ids = 1; bool include_endpoint_ids = 2; bool include_constraints = 3; bool include_service_ids = 4; bool include_subslice_ids = 5; bool include_config_rules = 6; } message SliceEvent { Event event = 1; SliceId slice_id = 2; Loading
src/context/client/ContextClient.py +24 −3 Original line number Diff line number Diff line Loading @@ -21,11 +21,11 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty, EndPointIdList, EndPointNameList, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub Loading Loading @@ -185,6 +185,13 @@ class ContextClient: LOGGER.debug('RemoveDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectDevice(self, request: DeviceFilter) -> DeviceList: LOGGER.debug('SelectDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectDevice(request) LOGGER.debug('SelectDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetDeviceEvents(self, request: Empty) -> Iterator[DeviceEvent]: LOGGER.debug('GetDeviceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading Loading @@ -283,6 +290,13 @@ class ContextClient: LOGGER.debug('RemoveService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectService(self, request: ServiceFilter) -> ServiceList: LOGGER.debug('SelectService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectService(request) LOGGER.debug('SelectService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetServiceEvents(self, request: Empty) -> Iterator[ServiceEvent]: LOGGER.debug('GetServiceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading Loading @@ -332,6 +346,13 @@ class ContextClient: LOGGER.debug('RemoveSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SelectSlice(self, request: SliceFilter) -> SliceList: LOGGER.debug('SelectSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectSlice(request) LOGGER.debug('SelectSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetSliceEvents(self, request: Empty) -> Iterator[SliceEvent]: LOGGER.debug('GetSliceEvents request: {:s}'.format(grpc_message_to_json_string(request))) Loading
src/context/service/ContextServiceServicerImpl.py +18 −6 Original line number Diff line number Diff line Loading @@ -18,11 +18,11 @@ from common.message_broker.MessageBroker import MessageBroker from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList, Empty, EndPointIdList, EndPointNameList, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList, Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer Loading @@ -31,13 +31,13 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from .database.Connection import ( connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set) from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_select, device_set from .database.EndPoint import endpoint_list_names from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set from .database.PolicyRule import ( policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set) from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset from .database.Topology import ( topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set) from .Events import ( Loading Loading @@ -161,6 +161,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectDevices(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList: return DeviceList(devices=device_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): Loading Loading @@ -235,6 +239,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList: return ServiceList(services=service_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): Loading Loading @@ -278,6 +286,10 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList: return SliceList(slices=slice_select(self.db_engine, request)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): Loading
src/context/service/database/Connection.py +9 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ import datetime, logging, re from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import Connection, ConnectionId, ServiceId Loading @@ -40,7 +40,11 @@ def connection_list_ids(db_engine : Engine, request : ServiceId) -> List[Dict]: def connection_list_objs(db_engine : Engine, request : ServiceId) -> List[Dict]: _,service_uuid = service_get_uuid(request, allow_random=False) def callback(session : Session) -> List[Dict]: obj_list : List[ConnectionModel] = session.query(ConnectionModel).filter_by(service_uuid=service_uuid).all() obj_list : List[ConnectionModel] = session.query(ConnectionModel)\ .options(selectinload(ConnectionModel.connection_service))\ .options(selectinload(ConnectionModel.connection_endpoints))\ .options(selectinload(ConnectionModel.connection_subservices))\ .filter_by(service_uuid=service_uuid).all() return [obj.dump() for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback) Loading @@ -48,6 +52,9 @@ def connection_get(db_engine : Engine, request : ConnectionId) -> Dict: connection_uuid = connection_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ConnectionModel] = session.query(ConnectionModel)\ .options(selectinload(ConnectionModel.connection_service))\ .options(selectinload(ConnectionModel.connection_endpoints))\ .options(selectinload(ConnectionModel.connection_subservices))\ .filter_by(connection_uuid=connection_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) Loading
src/context/service/database/Context.py +11 −3 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ import datetime, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import Context, ContextId Loading @@ -34,14 +34,22 @@ def context_list_ids(db_engine : Engine) -> List[Dict]: def context_list_objs(db_engine : Engine) -> List[Dict]: def callback(session : Session) -> List[Dict]: obj_list : List[ContextModel] = session.query(ContextModel).all() obj_list : List[ContextModel] = session.query(ContextModel)\ .options(selectinload(ContextModel.topologies))\ .options(selectinload(ContextModel.services))\ .options(selectinload(ContextModel.slices))\ .all() return [obj.dump() for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback) def context_get(db_engine : Engine, request : ContextId) -> Dict: context_uuid = context_get_uuid(request, allow_random=False) def callback(session : Session) -> Optional[Dict]: obj : Optional[ContextModel] = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() obj : Optional[ContextModel] = session.query(ContextModel)\ .options(selectinload(ContextModel.topologies))\ .options(selectinload(ContextModel.services))\ .options(selectinload(ContextModel.slices))\ .filter_by(context_uuid=context_uuid).one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine), callback) if obj is None: Loading