diff --git a/src/common/tools/mutex_queues/MutexQueues.py b/src/common/tools/mutex_queues/MutexQueues.py index 96e22a86f012cb8326c380a0ebbf0c1b40cae21c..b80dd1a9c3845136d7f2b81888ae1b492ea3adb3 100644 --- a/src/common/tools/mutex_queues/MutexQueues.py +++ b/src/common/tools/mutex_queues/MutexQueues.py @@ -34,10 +34,12 @@ # driver.configure(settings) # self.mutex_queues.signal_done(device_uuid) -import threading +import logging, threading from queue import Queue, Empty from typing import Dict +LOGGER = logging.getLogger(__name__) + class MutexQueues: def __init__(self) -> None: # lock to protect dictionary updates @@ -46,36 +48,63 @@ class MutexQueues: # dictionaty of queues of mutexes: queue_name => queue[mutex] # first mutex is the running one self.mutex_queues : Dict[str, Queue[threading.Event]] = dict() - + + def add_alias(self, queue_name_a : str, queue_name_b : str) -> None: + with self.lock: + if queue_name_a in self.mutex_queues and queue_name_b not in self.mutex_queues: + self.mutex_queues[queue_name_b] = self.mutex_queues[queue_name_a] + elif queue_name_b in self.mutex_queues and queue_name_a not in self.mutex_queues: + self.mutex_queues[queue_name_a] = self.mutex_queues[queue_name_b] + def wait_my_turn(self, queue_name : str) -> None: + LOGGER.warning('[wait_my_turn] begin queue_name={:s}'.format(str(queue_name))) # create my mutex and enqueue it mutex = threading.Event() + LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) with self.lock: + LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) first_in_queue = (queue.qsize() == 0) + LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} first_in_queue={:s}'.format(str(queue_name), str(first_in_queue))) queue.put_nowait(mutex) # if I'm the first in the queue upon addition, means there are no running tasks # directly return without waiting - if first_in_queue: return + if first_in_queue: + LOGGER.warning('[wait_my_turn] end first_in_queue queue_name={:s}'.format(str(queue_name))) + return # otherwise, wait for my turn in the queue + LOGGER.warning('[wait_my_turn] waiting queue_name={:s}'.format(str(queue_name))) mutex.wait() + LOGGER.warning('[wait_my_turn] end wait queue_name={:s}'.format(str(queue_name))) def signal_done(self, queue_name : str) -> None: + LOGGER.warning('[signal_done] begin queue_name={:s}'.format(str(queue_name))) # I'm done with my work with self.lock: + LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues))) queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) + LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} queue={:s}'.format(str(queue_name), str(queue))) # remove myself from the queue try: - queue.get(block=True, timeout=0.1) + LOGGER.warning('[wait_my_turn] [lock] before get queue_name={:s}'.format(str(queue_name))) + mutex = queue.get(block=True, timeout=0.1) + LOGGER.warning('[wait_my_turn] [lock] after get queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex))) except Empty: + LOGGER.warning('[wait_my_turn] [lock] empty queue_name={:s}'.format(str(queue_name))) pass # if there are no other tasks queued, return - if queue.qsize() == 0: return + if queue.qsize() == 0: + LOGGER.warning('[wait_my_turn] end queue.qsize==0 queue_name={:s}'.format(str(queue_name))) + return # otherwise, signal the next task in the queue to start next_mutex : threading.Event = queue.queue[0] + LOGGER.warning('[wait_my_turn] [lock] before set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) next_mutex.set() + LOGGER.warning('[wait_my_turn] [lock] after set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex))) + + LOGGER.warning('[signal_done] end set queue_name={:s}'.format(str(queue_name)))