import grpc, logging, queue, threading from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from context.proto.context_pb2 import Empty LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) class EventsCollector: def __init__( self, context_client_grpc : ContextClient, log_events_received=False ) -> None: self._events_queue = queue.Queue() self._log_events_received = log_events_received self._context_stream = context_client_grpc.GetContextEvents(Empty()) self._topology_stream = context_client_grpc.GetTopologyEvents(Empty()) self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) self._link_stream = context_client_grpc.GetLinkEvents(Empty()) self._service_stream = context_client_grpc.GetServiceEvents(Empty()) self._connection_stream = context_client_grpc.GetConnectionEvents(Empty()) self._context_thread = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False) self._topology_thread = threading.Thread(target=self._collect, args=(self._topology_stream ,), daemon=False) self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) self._link_thread = threading.Thread(target=self._collect, args=(self._link_stream ,), daemon=False) self._service_thread = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False) self._connection_thread = threading.Thread(target=self._collect, args=(self._connection_stream,), daemon=False) def _collect(self, events_stream) -> None: try: for event in events_stream: if self._log_events_received: LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) self._events_queue.put_nowait(event) except grpc.RpcError as e: if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member raise # pragma: no cover def start(self): self._context_thread.start() self._topology_thread.start() self._device_thread.start() self._link_thread.start() self._service_thread.start() self._connection_thread.start() def get_event(self, block : bool = True, timeout : float = 0.1): try: return self._events_queue.get(block=block, timeout=timeout) except queue.Empty: # pylint: disable=catching-non-exception return None def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): events = [] if count is None: while True: event = self.get_event(block=block, timeout=timeout) if event is None: break events.append(event) else: for _ in range(count): event = self.get_event(block=block, timeout=timeout) if event is None: continue events.append(event) return sorted(events, key=lambda e: e.event.timestamp) def stop(self): self._context_stream.cancel() self._topology_stream.cancel() self._device_stream.cancel() self._link_stream.cancel() self._service_stream.cancel() self._connection_stream.cancel() self._context_thread.join() self._topology_thread.join() self._device_thread.join() self._link_thread.join() self._service_thread.join() self._connection_thread.join()