diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index 6ac21a9739a7cab5d9bc33e5d022281be6793c65..7e72265705e23841413abac19488a0d31754531c 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -27,7 +27,7 @@ from common.proto.context_pb2 import ( from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer -from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from .database.Connection import ( connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set) from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set @@ -44,19 +44,7 @@ from .Constants import ( LOGGER = logging.getLogger(__name__) -SERVICE_NAME = 'Context' -METHOD_NAMES = [ - 'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents', - 'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents', - 'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents', - 'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents', - 'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents', - 'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents', - 'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents', - 'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule', - 'UnsetService', 'UnsetSlice', -] -METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) +METRICS_POOL = MetricsPool('Context', 'RPC') class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer): def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None: @@ -65,38 +53,38 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer self.messagebroker = messagebroker LOGGER.debug('Servicer Created') - def _get_metrics(self): return METRICS + def _get_metrics(self): return METRICS_POOL # ----- Context ---------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList: return context_list_ids(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList: return context_list_objs(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context: return context_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId: context_id,updated = context_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id}) return context_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty: deleted = context_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): yield ContextEvent(**json.loads(message.content)) @@ -104,33 +92,33 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Topology --------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList: return topology_list_ids(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList: return topology_list_objs(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology: return topology_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId: topology_id,updated = topology_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id}) return topology_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: deleted = topology_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): yield TopologyEvent(**json.loads(message.content)) @@ -138,33 +126,33 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Device ----------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList: return device_list_ids(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList: return device_list_objs(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device: return device_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id,updated = device_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id}) return device_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: deleted = device_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]: for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT): yield DeviceEvent(**json.loads(message.content)) @@ -172,33 +160,33 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Link ------------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: return link_list_ids(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: return link_list_objs(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: return link_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: link_id,updated = link_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id}) return link_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: deleted = link_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_LINK, EventTypeEnum.EVENTTYPE_REMOVE, {'link_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]: for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT): yield LinkEvent(**json.loads(message.content)) @@ -206,33 +194,33 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Service ---------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList: return service_list_ids(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList: return service_list_objs(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service: return service_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: service_id,updated = service_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id}) return service_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: deleted = service_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_SERVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'service_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]: for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT): yield ServiceEvent(**json.loads(message.content)) @@ -240,40 +228,40 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Slice ---------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList: return slice_list_ids(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList: return slice_list_objs(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice: return slice_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: slice_id,updated = slice_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id}) return slice_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: slice_id,updated = slice_unset(self.db_engine, request) # pylint: disable=unused-variable #if updated: # notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_UPDATE, {'slice_id': slice_id}) return slice_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: deleted = slice_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: # notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_REMOVE, {'slice_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]: for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT): yield SliceEvent(**json.loads(message.content)) @@ -281,26 +269,26 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Connection ------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList: return connection_list_ids(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList: return connection_list_objs(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection: return connection_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId: connection_id,updated = connection_set(self.db_engine, request) # pylint: disable=unused-variable #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE #notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id}) return connection_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty: deleted = connection_delete(self.db_engine, request) # pylint: disable=unused-variable #if deleted: @@ -308,7 +296,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': request}) return Empty() - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]: for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT): yield ConnectionEvent(**json.loads(message.content)) @@ -316,24 +304,24 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # ----- Policy ----------------------------------------------------------------------------------------------------- - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList: return policyrule_list_ids(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList: return policyrule_list_objs(self.db_engine) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule: return policyrule_get(self.db_engine, request) - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId: policyrule_id,updated = policyrule_set(self.db_engine, request) # pylint: disable=unused-variable return policyrule_id - @safe_and_metered_rpc_method(METRICS, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty: deleted = policyrule_delete(self.db_engine, request) # pylint: disable=unused-variable return Empty()