Loading src/common/tools/mutex_queues/MutexQueues.py +34 −5 Original line number Diff line number Diff line Loading @@ -34,10 +34,12 @@ # driver.configure(settings) # self.mutex_queues.signal_done(device_uuid) import threading import logging, threading from queue import Queue, Empty from typing import Dict LOGGER = logging.getLogger(__name__) class MutexQueues: def __init__(self) -> None: # lock to protect dictionary updates Loading @@ -47,35 +49,62 @@ class MutexQueues: # first mutex is the running one self.mutex_queues : Dict[str, Queue[threading.Event]] = dict() def add_alias(self, queue_name_a : str, queue_name_b : str) -> None: with self.lock: if queue_name_a in self.mutex_queues and queue_name_b not in self.mutex_queues: self.mutex_queues[queue_name_b] = self.mutex_queues[queue_name_a] elif queue_name_b in self.mutex_queues and queue_name_a not in self.mutex_queues: self.mutex_queues[queue_name_a] = self.mutex_queues[queue_name_b] def wait_my_turn(self, queue_name : str) -> None: LOGGER.warning('[wait_my_turn] begin queue_name={:s}'.format(str(queue_name))) # create my mutex and enqueue it mutex = threading.Event() LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) with self.lock: LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) first_in_queue = (queue.qsize() == 0) LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} first_in_queue={:s}'.format(str(queue_name), str(first_in_queue))) queue.put_nowait(mutex) # if I'm the first in the queue upon addition, means there are no running tasks # directly return without waiting if first_in_queue: return if first_in_queue: LOGGER.warning('[wait_my_turn] end first_in_queue queue_name={:s}'.format(str(queue_name))) return # otherwise, wait for my turn in the queue LOGGER.warning('[wait_my_turn] waiting queue_name={:s}'.format(str(queue_name))) mutex.wait() LOGGER.warning('[wait_my_turn] end wait queue_name={:s}'.format(str(queue_name))) def signal_done(self, queue_name : str) -> None: LOGGER.warning('[signal_done] begin queue_name={:s}'.format(str(queue_name))) # I'm done with my work with self.lock: LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} queue={:s}'.format(str(queue_name), str(queue))) # remove myself from the queue try: queue.get(block=True, timeout=0.1) LOGGER.warning('[wait_my_turn] [lock] before get queue_name={:s}'.format(str(queue_name))) mutex = queue.get(block=True, timeout=0.1) LOGGER.warning('[wait_my_turn] [lock] after get queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) except Empty: LOGGER.warning('[wait_my_turn] [lock] empty queue_name={:s}'.format(str(queue_name))) pass # if there are no other tasks queued, return if queue.qsize() == 0: return if queue.qsize() == 0: LOGGER.warning('[wait_my_turn] end queue.qsize==0 queue_name={:s}'.format(str(queue_name))) return # otherwise, signal the next task in the queue to start next_mutex : threading.Event = queue.queue[0] LOGGER.warning('[wait_my_turn] [lock] before set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) next_mutex.set() LOGGER.warning('[wait_my_turn] [lock] after set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) LOGGER.warning('[signal_done] end set queue_name={:s}'.format(str(queue_name))) Loading
src/common/tools/mutex_queues/MutexQueues.py +34 −5 Original line number Diff line number Diff line Loading @@ -34,10 +34,12 @@ # driver.configure(settings) # self.mutex_queues.signal_done(device_uuid) import threading import logging, threading from queue import Queue, Empty from typing import Dict LOGGER = logging.getLogger(__name__) class MutexQueues: def __init__(self) -> None: # lock to protect dictionary updates Loading @@ -47,35 +49,62 @@ class MutexQueues: # first mutex is the running one self.mutex_queues : Dict[str, Queue[threading.Event]] = dict() def add_alias(self, queue_name_a : str, queue_name_b : str) -> None: with self.lock: if queue_name_a in self.mutex_queues and queue_name_b not in self.mutex_queues: self.mutex_queues[queue_name_b] = self.mutex_queues[queue_name_a] elif queue_name_b in self.mutex_queues and queue_name_a not in self.mutex_queues: self.mutex_queues[queue_name_a] = self.mutex_queues[queue_name_b] def wait_my_turn(self, queue_name : str) -> None: LOGGER.warning('[wait_my_turn] begin queue_name={:s}'.format(str(queue_name))) # create my mutex and enqueue it mutex = threading.Event() LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) with self.lock: LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) first_in_queue = (queue.qsize() == 0) LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} first_in_queue={:s}'.format(str(queue_name), str(first_in_queue))) queue.put_nowait(mutex) # if I'm the first in the queue upon addition, means there are no running tasks # directly return without waiting if first_in_queue: return if first_in_queue: LOGGER.warning('[wait_my_turn] end first_in_queue queue_name={:s}'.format(str(queue_name))) return # otherwise, wait for my turn in the queue LOGGER.warning('[wait_my_turn] waiting queue_name={:s}'.format(str(queue_name))) mutex.wait() LOGGER.warning('[wait_my_turn] end wait queue_name={:s}'.format(str(queue_name))) def signal_done(self, queue_name : str) -> None: LOGGER.warning('[signal_done] begin queue_name={:s}'.format(str(queue_name))) # I'm done with my work with self.lock: LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} queue={:s}'.format(str(queue_name), str(queue))) # remove myself from the queue try: queue.get(block=True, timeout=0.1) LOGGER.warning('[wait_my_turn] [lock] before get queue_name={:s}'.format(str(queue_name))) mutex = queue.get(block=True, timeout=0.1) LOGGER.warning('[wait_my_turn] [lock] after get queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) except Empty: LOGGER.warning('[wait_my_turn] [lock] empty queue_name={:s}'.format(str(queue_name))) pass # if there are no other tasks queued, return if queue.qsize() == 0: return if queue.qsize() == 0: LOGGER.warning('[wait_my_turn] end queue.qsize==0 queue_name={:s}'.format(str(queue_name))) return # otherwise, signal the next task in the queue to start next_mutex : threading.Event = queue.queue[0] LOGGER.warning('[wait_my_turn] [lock] before set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) next_mutex.set() LOGGER.warning('[wait_my_turn] [lock] after set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) LOGGER.warning('[signal_done] end set queue_name={:s}'.format(str(queue_name)))