Commit 882bf9f3 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Tools - gRPC:

- Fixed reconnection to event streams when connection fails before returning a stream
- Added unit test
parent 4de94f6e
Loading
Loading
Loading
Loading
+58 −0
Original line number Diff line number Diff line
import grpc
import common.tools.grpc.BaseEventCollector as base_event_collector_module
from common.proto.context_pb2 import DeviceEvent, EventTypeEnum


class _FakeRpcError(grpc.RpcError):
    def __init__(self, status_code):
        self._status_code = status_code

    def code(self):
        return self._status_code


class _FakeStream:
    def __init__(self, events=None):
        self._events = iter(events or [])

    def __iter__(self):
        return self

    def __next__(self):
        return next(self._events)

    def cancel(self):
        pass


def _create_device_event():
    event = DeviceEvent()
    event.event.event_type = EventTypeEnum.EVENTTYPE_CREATE
    event.event.timestamp.timestamp = 1.0
    event.device_id.device_uuid.uuid = 'dev1'
    return event


def test_base_event_collector_retries_if_subscription_creation_fails(monkeypatch):
    monkeypatch.setattr(base_event_collector_module.time, 'sleep', lambda _seconds: None)

    state = {'calls': 0}

    def subscription_method(_request):
        state['calls'] += 1
        if state['calls'] == 1:
            raise _FakeRpcError(grpc.StatusCode.UNAVAILABLE)
        if state['calls'] == 2:
            return _FakeStream(events=[_create_device_event()])
        raise _FakeRpcError(grpc.StatusCode.CANCELLED)

    collector = base_event_collector_module.BaseEventCollector()
    collector.install_collector(subscription_method, object())

    collector.start()
    try:
        event = collector.get_event(block=True, timeout=1.0)
    finally:
        collector.stop()

    assert event.device_id.device_uuid.uuid == 'dev1'
+1 −1
Original line number Diff line number Diff line
@@ -41,8 +41,8 @@ class CollectorThread(threading.Thread):

    def run(self) -> None:
        while not self._terminate.is_set():
            self._stream = self._subscription_func()
            try:
                self._stream = self._subscription_func()
                for event in self._stream:
                    if self._log_events_received:
                        str_event = grpc_message_to_json_string(event)