Loading src/common/method_wrappers/Decorator.py +0 −14 Original line number Diff line number Diff line Loading @@ -12,20 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging, threading from enum import Enum from prettytable import PrettyTable Loading src/dlt/connector/client/DltConnectorClient.py +26 −44 Original line number Diff line number Diff line Loading @@ -12,12 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # DltConnectorClient.py import grpc import logging import asyncio import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TopologyId Loading @@ -40,90 +35,77 @@ class DltConnectorClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None #self.connect() #LOGGER.debug('Channel created') self.connect() LOGGER.debug('Channel created') async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) LOGGER.debug('Channel created') async def close(self): if self.channel is not None: await self.channel.close() def close(self): if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR async def RecordAll(self, request: TopologyId) -> Empty: def RecordAll(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAll(request) response = self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllDevices(self, request: TopologyId) -> Empty: def RecordAllDevices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllDevices(request) response = self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR # async def RecordDevice(self, request: DltDeviceId) -> Empty: # LOGGER.debug('RECORD_DEVICE request received: {:s}'.format(grpc_message_to_json_string(request))) # Simulate some asynchronous processing delay # await asyncio.sleep(2) # Simulates processing time # Create a dummy response (Empty message) # response = Empty() # LOGGER.debug('RECORD_DEVICE processing complete for request: {:s}'.format(grpc_message_to_json_string(request))) # return response async def RecordDevice(self, request: DltDeviceId) -> Empty: LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordDevice(request) def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllLinks(self, request: TopologyId) -> Empty: def RecordAllLinks(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllLinks(request) response = self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordLink(self, request: DltLinkId) -> Empty: def RecordLink(self, request : DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordLink(request) response = self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllServices(self, request: TopologyId) -> Empty: def RecordAllServices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllServices(request) response = self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordService(self, request: DltServiceId) -> Empty: def RecordService(self, request : DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordService(request) response = self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllSlices(self, request: TopologyId) -> Empty: def RecordAllSlices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllSlices(request) response = self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordSlice(self, request: DltSliceId) -> Empty: def RecordSlice(self, request : DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordSlice(request) response = self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response src/dlt/connector/client/DltConnectorClientSync.py→src/dlt/connector/client/DltConnectorClientAsync.py +30 −27 Original line number Diff line number Diff line Loading @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging import grpc, logging, asyncio from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TopologyId Loading @@ -27,7 +28,7 @@ MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class DltConnectorClientSync: class DltConnectorClientAsync: def __init__(self, host=None, port=None): if not host: host = get_service_host(ServiceNameEnum.DLT) if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) Loading @@ -35,77 +36,79 @@ class DltConnectorClientSync: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None self.connect() LOGGER.debug('Channel created') #self.connect() #LOGGER.debug('Channel created') def connect(self): self.channel = grpc.insecure_channel(self.endpoint) async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) LOGGER.debug('Channel created') def close(self): if self.channel is not None: self.channel.close() async def close(self): if self.channel is not None: await self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR def RecordAll(self, request : TopologyId) -> Empty: async def RecordAll(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAll(request) response = await self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllDevices(self, request : TopologyId) -> Empty: async def RecordAllDevices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllDevices(request) response = await self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordDevice(request) async def RecordDevice(self, request: DltDeviceId) -> Empty: LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllLinks(self, request : TopologyId) -> Empty: async def RecordAllLinks(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllLinks(request) response = await self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordLink(self, request : DltLinkId) -> Empty: async def RecordLink(self, request: DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordLink(request) response = await self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllServices(self, request : TopologyId) -> Empty: async def RecordAllServices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllServices(request) response = await self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordService(self, request : DltServiceId) -> Empty: async def RecordService(self, request: DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordService(request) response = await self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllSlices(self, request : TopologyId) -> Empty: async def RecordAllSlices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllSlices(request) response = await self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordSlice(self, request : DltSliceId) -> Empty: async def RecordSlice(self, request: DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordSlice(request) response = await self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response src/dlt/connector/client/DltEventsCollector.py +2 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ from typing import Callable, Optional import grpc, logging, queue, threading, time from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent from src.dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) Loading @@ -31,7 +31,7 @@ LOGGER.setLevel(logging.DEBUG) class DltEventsCollector(threading.Thread): def __init__( self, dltgateway_client : DltGatewayClientEvent, self, dltgateway_client : DltGatewayClient, log_events_received : bool = False, event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: Loading src/dlt/connector/client/DltGatewayClient.py +10 −63 Original line number Diff line number Diff line Loading @@ -13,14 +13,9 @@ # limitations under the License. import grpc import logging import asyncio from typing import Iterator, List from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) import grpc, logging from typing import Iterator from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string Loading @@ -40,70 +35,22 @@ class DltGatewayClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None #self.connect() self.message_queue: List[DltRecord] = [] #LOGGER.debug('Channel created') self.connect() LOGGER.debug('Channel created') async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltGatewayServiceStub(self.channel) LOGGER.debug('Channel created') async def close(self): def close(self): if self.channel is not None: await self.channel.close() self.channel.close() self.channel = None self.stub = None # async def dummy_process(self, request: DltRecord): # # Simulate processing delay # await asyncio.sleep(2) # return DltRecordStatus(status="DLTRECORDSTATUS_SUCCEEDED", record_id=request.record_id) @RETRY_DECORATOR async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response # @RETRY_DECORATOR # async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus: # self.message_queue.append(request) # LOGGER.debug(f'RecordToDlt request: {grpc_message_to_json_string(request)}') # LOGGER.debug(f'Queue length before processing: {len(self.message_queue)}') # response = await self.dummy_process(request) # LOGGER.debug(f'RecordToDlt result: {grpc_message_to_json_string(response)}') # self.message_queue.remove(request) # LOGGER.debug(f'Queue length after processing: {len(self.message_queue)}') # return response @RETRY_DECORATOR async def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetFromDlt(request) LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SubscribeToDlt(request) LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetDltStatus(request) LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def GetDltPeers(self, request : Empty) -> DltPeerStatusList: LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetDltPeers(request) LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) return response Loading
src/common/method_wrappers/Decorator.py +0 −14 Original line number Diff line number Diff line Loading @@ -12,20 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging, threading from enum import Enum from prettytable import PrettyTable Loading
src/dlt/connector/client/DltConnectorClient.py +26 −44 Original line number Diff line number Diff line Loading @@ -12,12 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # DltConnectorClient.py import grpc import logging import asyncio import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TopologyId Loading @@ -40,90 +35,77 @@ class DltConnectorClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None #self.connect() #LOGGER.debug('Channel created') self.connect() LOGGER.debug('Channel created') async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) LOGGER.debug('Channel created') async def close(self): if self.channel is not None: await self.channel.close() def close(self): if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR async def RecordAll(self, request: TopologyId) -> Empty: def RecordAll(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAll(request) response = self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllDevices(self, request: TopologyId) -> Empty: def RecordAllDevices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllDevices(request) response = self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR # async def RecordDevice(self, request: DltDeviceId) -> Empty: # LOGGER.debug('RECORD_DEVICE request received: {:s}'.format(grpc_message_to_json_string(request))) # Simulate some asynchronous processing delay # await asyncio.sleep(2) # Simulates processing time # Create a dummy response (Empty message) # response = Empty() # LOGGER.debug('RECORD_DEVICE processing complete for request: {:s}'.format(grpc_message_to_json_string(request))) # return response async def RecordDevice(self, request: DltDeviceId) -> Empty: LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordDevice(request) def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllLinks(self, request: TopologyId) -> Empty: def RecordAllLinks(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllLinks(request) response = self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordLink(self, request: DltLinkId) -> Empty: def RecordLink(self, request : DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordLink(request) response = self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllServices(self, request: TopologyId) -> Empty: def RecordAllServices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllServices(request) response = self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordService(self, request: DltServiceId) -> Empty: def RecordService(self, request : DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordService(request) response = self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordAllSlices(self, request: TopologyId) -> Empty: def RecordAllSlices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordAllSlices(request) response = self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def RecordSlice(self, request: DltSliceId) -> Empty: def RecordSlice(self, request : DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordSlice(request) response = self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response
src/dlt/connector/client/DltConnectorClientSync.py→src/dlt/connector/client/DltConnectorClientAsync.py +30 −27 Original line number Diff line number Diff line Loading @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging import grpc, logging, asyncio from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TopologyId Loading @@ -27,7 +28,7 @@ MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class DltConnectorClientSync: class DltConnectorClientAsync: def __init__(self, host=None, port=None): if not host: host = get_service_host(ServiceNameEnum.DLT) if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) Loading @@ -35,77 +36,79 @@ class DltConnectorClientSync: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None self.connect() LOGGER.debug('Channel created') #self.connect() #LOGGER.debug('Channel created') def connect(self): self.channel = grpc.insecure_channel(self.endpoint) async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) LOGGER.debug('Channel created') def close(self): if self.channel is not None: self.channel.close() async def close(self): if self.channel is not None: await self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR def RecordAll(self, request : TopologyId) -> Empty: async def RecordAll(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAll(request) response = await self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllDevices(self, request : TopologyId) -> Empty: async def RecordAllDevices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllDevices(request) response = await self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordDevice(request) async def RecordDevice(self, request: DltDeviceId) -> Empty: LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllLinks(self, request : TopologyId) -> Empty: async def RecordAllLinks(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllLinks(request) response = await self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordLink(self, request : DltLinkId) -> Empty: async def RecordLink(self, request: DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordLink(request) response = await self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllServices(self, request : TopologyId) -> Empty: async def RecordAllServices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllServices(request) response = await self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordService(self, request : DltServiceId) -> Empty: async def RecordService(self, request: DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordService(request) response = await self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordAllSlices(self, request : TopologyId) -> Empty: async def RecordAllSlices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordAllSlices(request) response = await self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RecordSlice(self, request : DltSliceId) -> Empty: async def RecordSlice(self, request: DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RecordSlice(request) response = await self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response
src/dlt/connector/client/DltEventsCollector.py +2 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,7 @@ from typing import Callable, Optional import grpc, logging, queue, threading, time from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent from src.dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) Loading @@ -31,7 +31,7 @@ LOGGER.setLevel(logging.DEBUG) class DltEventsCollector(threading.Thread): def __init__( self, dltgateway_client : DltGatewayClientEvent, self, dltgateway_client : DltGatewayClient, log_events_received : bool = False, event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: Loading
src/dlt/connector/client/DltGatewayClient.py +10 −63 Original line number Diff line number Diff line Loading @@ -13,14 +13,9 @@ # limitations under the License. import grpc import logging import asyncio from typing import Iterator, List from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) import grpc, logging from typing import Iterator from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string Loading @@ -40,70 +35,22 @@ class DltGatewayClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None #self.connect() self.message_queue: List[DltRecord] = [] #LOGGER.debug('Channel created') self.connect() LOGGER.debug('Channel created') async def connect(self): self.channel = grpc.aio.insecure_channel(self.endpoint) def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltGatewayServiceStub(self.channel) LOGGER.debug('Channel created') async def close(self): def close(self): if self.channel is not None: await self.channel.close() self.channel.close() self.channel = None self.stub = None # async def dummy_process(self, request: DltRecord): # # Simulate processing delay # await asyncio.sleep(2) # return DltRecordStatus(status="DLTRECORDSTATUS_SUCCEEDED", record_id=request.record_id) @RETRY_DECORATOR async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response # @RETRY_DECORATOR # async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus: # self.message_queue.append(request) # LOGGER.debug(f'RecordToDlt request: {grpc_message_to_json_string(request)}') # LOGGER.debug(f'Queue length before processing: {len(self.message_queue)}') # response = await self.dummy_process(request) # LOGGER.debug(f'RecordToDlt result: {grpc_message_to_json_string(response)}') # self.message_queue.remove(request) # LOGGER.debug(f'Queue length after processing: {len(self.message_queue)}') # return response @RETRY_DECORATOR async def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetFromDlt(request) LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SubscribeToDlt(request) LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetDltStatus(request) LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR async def GetDltPeers(self, request : Empty) -> DltPeerStatusList: LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) response = await self.stub.GetDltPeers(request) LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) return response