Commit 614c625f authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Pre-merge code cleanup

parent c43486a2
Loading
Loading
Loading
Loading
+32 −1
Original line number Diff line number Diff line
@@ -15,7 +15,10 @@

import grpc, logging
from typing import Iterator
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription
from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
    DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription
)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
@@ -47,9 +50,37 @@ class DltGatewayClient:
        self.channel = None
        self.stub = None

    @RETRY_DECORATOR
    def RecordToDlt(self, request : DltRecord) -> DltRecordStatus:
        LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.RecordToDlt(request)
        LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetFromDlt(self, request : DltRecordId) -> DltRecord:
        LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetFromDlt(request)
        LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]:
        LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.SubscribeToDlt(request)
        LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus:
        LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetDltStatus(request)
        LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetDltPeers(self, request : Empty) -> DltPeerStatusList:
        LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetDltPeers(request)
        LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response)))
        return response
+4 −5
Original line number Diff line number Diff line
@@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, asyncio
import asyncio, grpc, logging
from typing import Iterator, List
from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
    DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription)
    DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription
)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
@@ -35,9 +36,7 @@ class DltGatewayClientAsync:
        LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
        self.channel = None
        self.stub = None
        #self.connect()
        self.message_queue: List[DltRecord] = []
        #LOGGER.debug('Channel created')

    async def connect(self):
        self.channel = grpc.aio.insecure_channel(self.endpoint)