Commit 90862915 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- Improved EventsCollector to use priority queue and sort batches of events by timestamp
parent 7271b2be
Loading
Loading
Loading
Loading
+7 −5
Original line number Diff line number Diff line
@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable
import grpc, logging, queue, threading, time
from typing import Callable
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
@@ -23,7 +23,7 @@ LOGGER.setLevel(logging.DEBUG)

class _Collector(threading.Thread):
    def __init__(
        self, subscription_func : Callable, events_queue = queue.Queue,
        self, subscription_func : Callable, events_queue = queue.PriorityQueue,
        terminate = threading.Event, log_events_received: bool = False
    ) -> None:
        super().__init__(daemon=False)
@@ -45,7 +45,8 @@ class _Collector(threading.Thread):
                    if self._log_events_received:
                        str_event = grpc_message_to_json_string(event)
                        LOGGER.info('[_collect] event: {:s}'.format(str_event))
                    self._events_queue.put_nowait(event)
                    timestamp = event.event.timestamp.timestamp
                    self._events_queue.put_nowait((timestamp, event))
            except grpc.RpcError as e:
                if e.code() == grpc.StatusCode.UNAVAILABLE:
                    LOGGER.info('[_collect] UNAVAILABLE... retrying...')
@@ -68,7 +69,7 @@ class EventsCollector:
        activate_slice_collector      : bool = True,
        activate_connection_collector : bool = True,
    ) -> None:
        self._events_queue = queue.Queue()
        self._events_queue = queue.PriorityQueue()
        self._terminate = threading.Event()
        self._log_events_received = log_events_received

@@ -120,7 +121,8 @@ class EventsCollector:

    def get_event(self, block : bool = True, timeout : float = 0.1):
        try:
            return self._events_queue.get(block=block, timeout=timeout)
            _,event = self._events_queue.get(block=block, timeout=timeout)
            return event
        except queue.Empty: # pylint: disable=catching-non-exception
            return None