diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index 5e03283802f9df77c06922f2b3e6d5e82b1d63ec..8e2303b2c69dba1b5a313538ca28518ab1434c91 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -15,7 +15,10 @@ import grpc, logging from typing import Iterator -from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription +from common.proto.context_pb2 import Empty, TeraFlowController +from common.proto.dlt_gateway_pb2 import ( + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription +) from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string @@ -47,9 +50,37 @@ class DltGatewayClient: self.channel = None self.stub = None + @RETRY_DECORATOR + def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: + LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordToDlt(request) + LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetFromDlt(self, request : DltRecordId) -> DltRecord: + LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetFromDlt(request) + LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SubscribeToDlt(request) LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response + + @RETRY_DECORATOR + def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: + LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetDltStatus(request) + LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetDltPeers(self, request : Empty) -> DltPeerStatusList: + LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetDltPeers(request) + LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/dlt/connector/client/DltGatewayClientAsync.py b/src/dlt/connector/client/DltGatewayClientAsync.py index 3f1cf5396eea0eea63930969be54eaeb4afc7e89..816241ec587295ffd6a8ef24cca5c66942a849d5 100644 --- a/src/dlt/connector/client/DltGatewayClientAsync.py +++ b/src/dlt/connector/client/DltGatewayClientAsync.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging, asyncio +import asyncio, grpc, logging from typing import Iterator, List from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( - DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription +) from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string @@ -35,9 +36,7 @@ class DltGatewayClientAsync: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None - #self.connect() self.message_queue: List[DltRecord] = [] - #LOGGER.debug('Channel created') async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) @@ -56,7 +55,7 @@ class DltGatewayClientAsync: response = await self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response - + @RETRY_DECORATOR async def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request)))