From 4c06c9f591b1096250dc56ed4c3b967d50e77a6d Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Thu, 10 Nov 2022 20:31:36 +0000
Subject: [PATCH] Dlt Connector component:

- Converted DltEventsCollector into a thread to enable auto-reconnection to gateway
- Corrected removal of slice endpoints while updating remote slices on local domain
---
 .../connector/client/DltEventsCollector.py    | 48 ++++++++++---------
 .../event_dispatcher/DltEventDispatcher.py    |  4 +-
 2 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py
index 6fe2474ce..d022ac0f0 100644
--- a/src/dlt/connector/client/DltEventsCollector.py
+++ b/src/dlt/connector/client/DltEventsCollector.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import grpc, logging, queue, threading
+import grpc, logging, queue, threading, time
 from common.proto.dlt_gateway_pb2 import DltRecordSubscription
 from common.tools.grpc.Tools import grpc_message_to_json_string
 from dlt.connector.client.DltGatewayClient import DltGatewayClient
@@ -20,32 +20,36 @@ from dlt.connector.client.DltGatewayClient import DltGatewayClient
 LOGGER = logging.getLogger(__name__)
 LOGGER.setLevel(logging.DEBUG)
 
-class DltEventsCollector:
+class DltEventsCollector(threading.Thread):
     def __init__(
         self, dltgateway_client : DltGatewayClient,
         log_events_received     : bool = False,
     ) -> None:
-        self._events_queue = queue.Queue()
+        super().__init__(name='DltEventsCollector', daemon=True)
+        self._dltgateway_client = dltgateway_client
         self._log_events_received = log_events_received
-        subscription = DltRecordSubscription() # bu default subscribe to all
-        self._dltgateway_stream = dltgateway_client.SubscribeToDlt(subscription)
-        self._dltgateway_thread = self._create_collector_thread(self._dltgateway_stream)
-
-    def _create_collector_thread(self, stream, as_daemon : bool = False):
-        return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon)
-
-    def _collect(self, events_stream) -> None:
-        try:
-            for event in events_stream:
-                if self._log_events_received:
-                    LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
-                self._events_queue.put_nowait(event)
-        except grpc.RpcError as e:
-            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
-                raise # pragma: no cover
+        self._events_queue = queue.Queue()
+        self._terminate = threading.Event()
+        self._dltgateway_stream = None
 
-    def start(self):
-        if self._dltgateway_thread is not None: self._dltgateway_thread.start()
+    def run(self) -> None:
+        self._terminate.clear()
+        while not self._terminate.is_set():
+            try:
+                subscription = DltRecordSubscription() # bu default subscribe to all
+                self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription)
+                for event in self._dltgateway_stream:
+                    if self._log_events_received:
+                        LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
+                    self._events_queue.put_nowait(event)
+            except grpc.RpcError as e:
+                if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
+                    time.sleep(0.5)
+                    continue
+                elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member
+                    break
+                else:
+                    raise # pragma: no cover
 
     def get_event(self, block : bool = True, timeout : float = 0.1):
         try:
@@ -68,5 +72,5 @@ class DltEventsCollector:
         return sorted(events, key=lambda e: e.event.timestamp.timestamp)
 
     def stop(self):
+        self._terminate.set()
         if self._dltgateway_stream is not None: self._dltgateway_stream.cancel()
-        if self._dltgateway_thread is not None: self._dltgateway_thread.join()
diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py
index 01a869590..8973ae621 100644
--- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py
+++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py
@@ -199,8 +199,8 @@ class DltEventDispatcher(threading.Thread):
             local_slice.CopyFrom(slice_)
 
             # pylint: disable=no-member
-            local_slice.slice_service_ids.Clear()   # they are from remote domains so will not be present locally
-            local_slice.slice_subslice_ids.Clear()  # they are from remote domains so will not be present locally
+            del local_slice.slice_service_ids[:]    # they are from remote domains so will not be present locally
+            del local_slice.slice_subslice_ids[:]   # they are from remote domains so will not be present locally
 
             clients.context_client.SetSlice(local_slice)
         else:
-- 
GitLab