Commit 4c06c9f5 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Dlt Connector component:

- Converted DltEventsCollector into a thread to enable auto-reconnection to gateway
- Corrected removal of slice endpoints while updating remote slices on local domain
parent feeab8ca
Loading
Loading
Loading
Loading
+26 −22
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, queue, threading
import grpc, logging, queue, threading, time
from common.proto.dlt_gateway_pb2 import DltRecordSubscription
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient
@@ -20,33 +20,37 @@ from dlt.connector.client.DltGatewayClient import DltGatewayClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

class DltEventsCollector:
class DltEventsCollector(threading.Thread):
    def __init__(
        self, dltgateway_client : DltGatewayClient,
        log_events_received     : bool = False,
    ) -> None:
        self._events_queue = queue.Queue()
        super().__init__(name='DltEventsCollector', daemon=True)
        self._dltgateway_client = dltgateway_client
        self._log_events_received = log_events_received
        subscription = DltRecordSubscription() # bu default subscribe to all
        self._dltgateway_stream = dltgateway_client.SubscribeToDlt(subscription)
        self._dltgateway_thread = self._create_collector_thread(self._dltgateway_stream)

    def _create_collector_thread(self, stream, as_daemon : bool = False):
        return threading.Thread(target=self._collect, args=(stream,), daemon=as_daemon)
        self._events_queue = queue.Queue()
        self._terminate = threading.Event()
        self._dltgateway_stream = None

    def _collect(self, events_stream) -> None:
    def run(self) -> None:
        self._terminate.clear()
        while not self._terminate.is_set():
            try:
            for event in events_stream:
                subscription = DltRecordSubscription() # bu default subscribe to all
                self._dltgateway_stream = self._dltgateway_client.SubscribeToDlt(subscription)
                for event in self._dltgateway_stream:
                    if self._log_events_received:
                        LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
                    self._events_queue.put_nowait(event)
            except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
                    time.sleep(0.5)
                    continue
                elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                    break
                else:
                    raise # pragma: no cover

    def start(self):
        if self._dltgateway_thread is not None: self._dltgateway_thread.start()

    def get_event(self, block : bool = True, timeout : float = 0.1):
        try:
            return self._events_queue.get(block=block, timeout=timeout)
@@ -68,5 +72,5 @@ class DltEventsCollector:
        return sorted(events, key=lambda e: e.event.timestamp.timestamp)

    def stop(self):
        self._terminate.set()
        if self._dltgateway_stream is not None: self._dltgateway_stream.cancel()
        if self._dltgateway_thread is not None: self._dltgateway_thread.join()
+2 −2
Original line number Diff line number Diff line
@@ -199,8 +199,8 @@ class DltEventDispatcher(threading.Thread):
            local_slice.CopyFrom(slice_)

            # pylint: disable=no-member
            local_slice.slice_service_ids.Clear()   # they are from remote domains so will not be present locally
            local_slice.slice_subslice_ids.Clear()  # they are from remote domains so will not be present locally
            del local_slice.slice_service_ids[:]    # they are from remote domains so will not be present locally
            del local_slice.slice_subslice_ids[:]   # they are from remote domains so will not be present locally

            clients.context_client.SetSlice(local_slice)
        else: