diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py index 6fe2474cead37094c507a8a612181dc7f7243544..d022ac0f0144eecfcdb706665a8bde81fa54492f 100644 --- a/src/dlt/connector/client/DltEventsCollector.py +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging, queue, threading +import grpc, logging, queue, threading, time from common.proto.dlt_gateway_pb2 import DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClient import DltGatewayClient @@ -20,32 +20,36 @@ from dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -class DltEventsCollector: +class DltEventsCollector(threading.Thread): def __init__( self, dltgateway_client : DltGatewayClient, log_events_received : bool = False, ) -> None: - self._events_queue = queue.Queue() + super().__init__(name='DltEventsCollector', daemon=True) + self._dltgateway_client = dltgateway_client self._log_events_received = log_events_received - subscription = DltRecordSubscription() # bu default subscribe to all - self._dltgateway_stream = dltgateway_client.SubscribeToDlt(subscription) - self._dltgateway_thread = self._create_collector_thread(self._dltgateway_stream) - - def _create_collector_thread(self, stream, as_daemon : bool = False): - return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon) - - def _collect(self, events_stream) -> None: - try: - for event in events_stream: - if self._log_events_received: - LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) - self._events_queue.put_nowait(event) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member - raise # pragma: no cover + self._events_queue = queue.Queue() + self._terminate = threading.Event() + self._dltgateway_stream = None - def start(self): - if self._dltgateway_thread is not None: self._dltgateway_thread.start() + def run(self) -> None: + self._terminate.clear() + while not self._terminate.is_set(): + try: + subscription = DltRecordSubscription() # bu default subscribe to all + self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription) + for event in self._dltgateway_stream: + if self._log_events_received: + LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) + self._events_queue.put_nowait(event) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member + time.sleep(0.5) + continue + elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member + break + else: + raise # pragma: no cover def get_event(self, block : bool = True, timeout : float = 0.1): try: @@ -68,5 +72,5 @@ class DltEventsCollector: return sorted(events, key=lambda e: e.event.timestamp.timestamp) def stop(self): + self._terminate.set() if self._dltgateway_stream is not None: self._dltgateway_stream.cancel() - if self._dltgateway_thread is not None: self._dltgateway_thread.join() diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py index 01a8695904157269eaa9718dac60fd29ccac05c4..8973ae621c1291f8ed6e2673f0c64b59712143ee 100644 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py @@ -199,8 +199,8 @@ class DltEventDispatcher(threading.Thread): local_slice.CopyFrom(slice_) # pylint: disable=no-member - local_slice.slice_service_ids.Clear() # they are from remote domains so will not be present locally - local_slice.slice_subslice_ids.Clear() # they are from remote domains so will not be present locally + del local_slice.slice_service_ids[:] # they are from remote domains so will not be present locally + del local_slice.slice_subslice_ids[:] # they are from remote domains so will not be present locally clients.context_client.SetSlice(local_slice) else: