MutexQueues.py 3.66 KB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# MutexQueues:
# ------------
# This class enables to schedule and serialize operations concurrently issued
# over a number of resources. For instance, when multiple components want to
# configure devices through the Device component, configuration operations
# have to be serialized to prevent data corruptions, and race conditions, etc.
# Usage Example:
#   class Servicer():
#       def __init__(self):
#           # init other stuff
#           self.drivers = dict()
#           self.mutex_queues = MutexQueues()
#       
#       def configure_device(self, device_uuid, settings):
#           self.mutex_queues.wait_my_turn(device_uuid)
#           driver = self.drivers.get(device_uuid)
#           if driver is None:
#               driver = Driver(device_uuid)
#               self.drivers[device_uuid] = driver
#           driver.configure(settings)
#           self.mutex_queues.signal_done(device_uuid)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import threading
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from queue import Queue, Empty
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict

class MutexQueues:
    def __init__(self) -> None:
        # lock to protect dictionary updates
        self.lock = threading.Lock()

        # dictionaty of queues of mutexes: queue_name => queue[mutex]
        # first mutex is the running one
        self.mutex_queues : Dict[str, Queue[threading.Event]] = dict()

    def add_alias(self, queue_name_a : str, queue_name_b : str) -> None:
        with self.lock:
            if queue_name_a in self.mutex_queues and queue_name_b not in self.mutex_queues:
                self.mutex_queues[queue_name_b] = self.mutex_queues[queue_name_a]
            elif queue_name_b in self.mutex_queues and queue_name_a not in self.mutex_queues:
                self.mutex_queues[queue_name_a] = self.mutex_queues[queue_name_b]
            elif queue_name_b not in self.mutex_queues and queue_name_a not in self.mutex_queues:
                self.mutex_queues[queue_name_b] = self.mutex_queues.setdefault(queue_name_a, Queue())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def wait_my_turn(self, queue_name : str) -> None:
        # create my mutex and enqueue it
        mutex = threading.Event()
        with self.lock:
            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
            first_in_queue = (queue.qsize() == 0)
            queue.put_nowait(mutex)

        # if I'm the first in the queue upon addition, means there are no running tasks
        # directly return without waiting
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if first_in_queue: return
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        # otherwise, wait for my turn in the queue
        mutex.wait()

    def signal_done(self, queue_name : str) -> None:
        # I'm done with my work
        with self.lock:
            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
            
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # remove myself from the queue
            try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                queue.get(block=True, timeout=0.1)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            except Empty:
                pass
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            # if there are no other tasks queued, return
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if queue.qsize() == 0: return
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            # otherwise, signal the next task in the queue to start
            next_mutex : threading.Event = queue.queue[0]
            next_mutex.set()