From 10a63cc42b3efddb00c4d2201288ec7b8948b2d7 Mon Sep 17 00:00:00 2001
From: Lluis Gifre <lluis.gifre@cttc.es>
Date: Tue, 4 Oct 2022 09:23:27 +0000
Subject: [PATCH] Common and Device:

- Implemented generic MutexQueues class
- Implemented sequentialization of operations in Device component to prevent data corruption and race conditions
---
 src/common/tools/mutex_queues/MutexQueues.py  |  78 +++
 src/common/tools/mutex_queues/__init__.py     |  14 +
 src/device/service/DeviceService.py           |   7 +-
 .../service/DeviceServiceServicerImpl.py      | 610 +++++++++---------
 4 files changed, 414 insertions(+), 295 deletions(-)
 create mode 100644 src/common/tools/mutex_queues/MutexQueues.py
 create mode 100644 src/common/tools/mutex_queues/__init__.py

diff --git a/src/common/tools/mutex_queues/MutexQueues.py b/src/common/tools/mutex_queues/MutexQueues.py
new file mode 100644
index 000000000..c3ab760f2
--- /dev/null
+++ b/src/common/tools/mutex_queues/MutexQueues.py
@@ -0,0 +1,78 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# MutexQueues:
+# ------------
+# This class enables to schedule and serialize operations concurrently issued
+# over a number of resources. For instance, when multiple components want to
+# configure devices through the Device component, configuration operations
+# have to be serialized to prevent data corruptions, and race conditions, etc.
+# Usage Example:
+#   class Servicer():
+#       def __init__(self):
+#           # init other stuff
+#           self.drivers = dict()
+#           self.mutex_queues = MutexQueues()
+#       
+#       def configure_device(self, device_uuid, settings):
+#           self.mutex_queues.wait_my_turn(device_uuid)
+#           driver = self.drivers.get(device_uuid)
+#           if driver is None:
+#               driver = Driver(device_uuid)
+#               self.drivers[device_uuid] = driver
+#           driver.configure(settings)
+#           self.mutex_queues.signal_done(device_uuid)
+
+import threading
+from queue import Queue
+from typing import Dict
+
+class MutexQueues:
+    def __init__(self) -> None:
+        # lock to protect dictionary updates
+        self.lock = threading.Lock()
+
+        # dictionaty of queues of mutexes: queue_name => queue[mutex]
+        # first mutex is the running one
+        self.mutex_queues : Dict[str, Queue[threading.Event]] = dict()
+    
+    def wait_my_turn(self, queue_name : str) -> None:
+        # create my mutex and enqueue it
+        mutex = threading.Event()
+        with self.lock:
+            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
+            first_in_queue = (queue.qsize() == 0)
+            queue.put_nowait(mutex)
+
+        # if I'm the first in the queue upon addition, means there are no running tasks
+        # directly return without waiting
+        if first_in_queue: return
+
+        # otherwise, wait for my turn in the queue
+        mutex.wait()
+
+    def signal_done(self, queue_name : str) -> None:
+        # I'm done with my work
+        with self.lock:
+            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
+            
+            # remove muself from the queue
+            queue.get_nowait()
+
+            # if there are no other tasks queued, return
+            if queue.qsize() == 0: return
+
+            # otherwise, signal the next task in the queue to start
+            next_mutex : threading.Event = queue.queue[0]
+            next_mutex.set()
diff --git a/src/common/tools/mutex_queues/__init__.py b/src/common/tools/mutex_queues/__init__.py
new file mode 100644
index 000000000..70a332512
--- /dev/null
+++ b/src/common/tools/mutex_queues/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py
index 4dc2b0100..59134f26d 100644
--- a/src/device/service/DeviceService.py
+++ b/src/device/service/DeviceService.py
@@ -23,10 +23,15 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache
 from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
 from .MonitoringLoops import MonitoringLoops
 
+# Custom gRPC settings
+# Multiple clients might keep connections alive waiting for RPC methods to be executed.
+# Requests needs to be serialized to ensure correct device configurations
+GRPC_MAX_WORKERS = 200
+
 class DeviceService(GenericGrpcService):
     def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None:
         port = get_service_port_grpc(ServiceNameEnum.DEVICE)
-        super().__init__(port, cls_name=cls_name)
+        super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
         database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
         self.monitoring_loops = MonitoringLoops(database)
         self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops)
diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py
index 9ffd028a6..d5d44f34f 100644
--- a/src/device/service/DeviceServiceServicerImpl.py
+++ b/src/device/service/DeviceServiceServicerImpl.py
@@ -24,6 +24,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
 from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
 from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
 from common.tools.grpc.Tools import grpc_message_to_json
+from common.tools.mutex_queues.MutexQueues import MutexQueues
 from context.client.ContextClient import ContextClient
 from .database.ConfigModel import (
     ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
@@ -56,6 +57,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
         self.database = database
         self.driver_instance_cache = driver_instance_cache
         self.monitoring_loops = monitoring_loops
+        self.mutex_queues = MutexQueues()
         LOGGER.debug('Servicer Created')
 
     @safe_and_metered_rpc_method(METRICS, LOGGER)
@@ -101,348 +103,368 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
         json_request['device_config'] = {}
         request = Device(**json_request)
 
-        sync_device_from_context(device_uuid, self.context_client, self.database)
-        db_device,_ = update_device_in_local_database(self.database, request)
-
-        driver_filter_fields = get_device_driver_filter_fields(db_device)
-
-        #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
-        address  = connection_config_rules.pop('address', None)
-        port     = connection_config_rules.pop('port', None)
-        settings = connection_config_rules.pop('settings', '{}')
+        self.mutex_queues.wait_my_turn(device_uuid)
         try:
-            settings = json.loads(settings)
-        except ValueError as e:
-            raise InvalidArgumentException(
-                'device.device_config.config_rules[settings]', settings,
-                extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
-        driver : _Driver = self.driver_instance_cache.get(
-            device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
-        driver.Connect()
-
-        endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
-        try:
-            for resource_key, resource_value in endpoints:
+            sync_device_from_context(device_uuid, self.context_client, self.database)
+            db_device,_ = update_device_in_local_database(self.database, request)
+
+            driver_filter_fields = get_device_driver_filter_fields(db_device)
+
+            #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
+            address  = connection_config_rules.pop('address', None)
+            port     = connection_config_rules.pop('port', None)
+            settings = connection_config_rules.pop('settings', '{}')
+            try:
+                settings = json.loads(settings)
+            except ValueError as e:
+                raise InvalidArgumentException(
+                    'device.device_config.config_rules[settings]', settings,
+                    extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
+            driver : _Driver = self.driver_instance_cache.get(
+                device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
+            driver.Connect()
+
+            endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
+            try:
+                for resource_key, resource_value in endpoints:
+                    if isinstance(resource_value, Exception):
+                        LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
+                        continue
+                    endpoint_uuid = resource_value.get('uuid')
+                    endpoint_type = resource_value.get('type')
+                    str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
+                    db_endpoint, _ = update_or_create_object(
+                        self.database, EndPointModel, str_endpoint_key, {
+                        'device_fk'    : db_device,
+                        'endpoint_uuid': endpoint_uuid,
+                        'endpoint_type': endpoint_type,
+                        'resource_key' : resource_key,
+                    })
+                    sample_types : Dict[int, str] = resource_value.get('sample_types', {})
+                    for sample_type, monitor_resource_key in sample_types.items():
+                        str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
+                        update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
+                            'endpoint_fk'    : db_endpoint,
+                            'resource_key'   : monitor_resource_key,
+                            'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
+                        })
+            except: # pylint: disable=bare-except
+                LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
+
+            raw_running_config_rules = driver.GetConfig()
+            running_config_rules = []
+            for resource_key, resource_value in raw_running_config_rules:
                 if isinstance(resource_value, Exception):
-                    LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
+                    msg = 'Error retrieving config rules: {:s} => {:s}'
+                    LOGGER.error(msg.format(str(resource_key), str(resource_value)))
                     continue
-                endpoint_uuid = resource_value.get('uuid')
-                endpoint_type = resource_value.get('type')
-                str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
-                db_endpoint, _ = update_or_create_object(
-                    self.database, EndPointModel, str_endpoint_key, {
-                    'device_fk'    : db_device,
-                    'endpoint_uuid': endpoint_uuid,
-                    'endpoint_type': endpoint_type,
-                    'resource_key' : resource_key,
-                })
-                sample_types : Dict[int, str] = resource_value.get('sample_types', {})
-                for sample_type, monitor_resource_key in sample_types.items():
-                    str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
-                    update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
-                        'endpoint_fk'    : db_endpoint,
-                        'resource_key'   : monitor_resource_key,
-                        'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
-                    })
-        except: # pylint: disable=bare-except
-            LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
-
-        raw_running_config_rules = driver.GetConfig()
-        running_config_rules = []
-        for resource_key, resource_value in raw_running_config_rules:
-            if isinstance(resource_value, Exception):
-                msg = 'Error retrieving config rules: {:s} => {:s}'
-                LOGGER.error(msg.format(str(resource_key), str(resource_value)))
-                continue
-            config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
-            running_config_rules.append(config_rule)
+                config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
+                running_config_rules.append(config_rule)
 
-        #for running_config_rule in running_config_rules:
-        #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
-        update_config(self.database, device_uuid, 'running', running_config_rules)
+            #for running_config_rule in running_config_rules:
+            #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
+            update_config(self.database, device_uuid, 'running', running_config_rules)
 
-        initial_config_rules = driver.GetInitialConfig()
-        update_config(self.database, device_uuid, 'initial', initial_config_rules)
+            initial_config_rules = driver.GetInitialConfig()
+            update_config(self.database, device_uuid, 'initial', initial_config_rules)
 
-        #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
-        #    include_config_rules=True, include_drivers=True, include_endpoints=True))))
+            #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
+            #    include_config_rules=True, include_drivers=True, include_endpoints=True))))
 
-        sync_device_to_context(db_device, self.context_client)
-        return DeviceId(**db_device.dump_id())
+            sync_device_to_context(db_device, self.context_client)
+            return DeviceId(**db_device.dump_id())
+        finally:
+            self.mutex_queues.signal_done(device_uuid)
 
     @safe_and_metered_rpc_method(METRICS, LOGGER)
     def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
         device_id = request.device_id
         device_uuid = device_id.device_uuid.uuid
 
-        sync_device_from_context(device_uuid, self.context_client, self.database)
+        self.mutex_queues.wait_my_turn(device_uuid)
+        try:
+            sync_device_from_context(device_uuid, self.context_client, self.database)
 
-        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
-        context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
-        #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
+            context_config_rules = get_config_rules(self.database, device_uuid, 'running')
+            context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
+            #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
 
-        db_device,_ = update_device_in_local_database(self.database, request)
+            db_device,_ = update_device_in_local_database(self.database, request)
 
-        request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
-        #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
+            request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
+            #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
 
-        resources_to_set    : List[Tuple[str, Any]] = [] # key, value
-        resources_to_delete : List[Tuple[str, Any]] = [] # key, value
+            resources_to_set    : List[Tuple[str, Any]] = [] # key, value
+            resources_to_delete : List[Tuple[str, Any]] = [] # key, value
 
-        for config_rule in request_config_rules:
-            action, key, value = config_rule
-            if action == ORM_ConfigActionEnum.SET:
-                if (key not in context_config_rules) or (context_config_rules[key] != value):
-                    resources_to_set.append((key, value))
-            elif action == ORM_ConfigActionEnum.DELETE:
-                if key in context_config_rules:
-                    resources_to_delete.append((key, value))
+            for config_rule in request_config_rules:
+                action, key, value = config_rule
+                if action == ORM_ConfigActionEnum.SET:
+                    if (key not in context_config_rules) or (context_config_rules[key] != value):
+                        resources_to_set.append((key, value))
+                elif action == ORM_ConfigActionEnum.DELETE:
+                    if key in context_config_rules:
+                        resources_to_delete.append((key, value))
 
-        #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
-        #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
+            #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
+            #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
 
-        # TODO: use of datastores (might be virtual ones) to enable rollbacks
+            # TODO: use of datastores (might be virtual ones) to enable rollbacks
 
-        errors = []
+            errors = []
 
-        driver : _Driver = self.driver_instance_cache.get(device_uuid)
-        if driver is None:
-            errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
+            driver : _Driver = self.driver_instance_cache.get(device_uuid)
+            if driver is None:
+                errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
+
+            if len(errors) == 0:
+                results_setconfig = driver.SetConfig(resources_to_set)
+                errors.extend(check_set_errors(resources_to_set, results_setconfig))
 
-        if len(errors) == 0:
-            results_setconfig = driver.SetConfig(resources_to_set)
-            errors.extend(check_set_errors(resources_to_set, results_setconfig))
+            if len(errors) == 0:
+                results_deleteconfig = driver.DeleteConfig(resources_to_delete)
+                errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
 
-        if len(errors) == 0:
-            results_deleteconfig = driver.DeleteConfig(resources_to_delete)
-            errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
+            if len(errors) > 0:
+                raise OperationFailedException('ConfigureDevice', extra_details=errors)
 
-        if len(errors) > 0:
-            raise OperationFailedException('ConfigureDevice', extra_details=errors)
+            running_config_rules = driver.GetConfig()
+            running_config_rules = [
+                (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
+                for config_rule in running_config_rules if not isinstance(config_rule[1], Exception)
+            ]
+            #for running_config_rule in running_config_rules:
+            #    LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
+            update_config(self.database, device_uuid, 'running', running_config_rules)
 
-        running_config_rules = driver.GetConfig()
-        running_config_rules = [
-            (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
-            for config_rule in running_config_rules if not isinstance(config_rule[1], Exception)
-        ]
-        #for running_config_rule in running_config_rules:
-        #    LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
-        update_config(self.database, device_uuid, 'running', running_config_rules)
+            sync_device_to_context(db_device, self.context_client)
+            return DeviceId(**db_device.dump_id())
+        finally:
+            self.mutex_queues.signal_done(device_uuid)
 
-        sync_device_to_context(db_device, self.context_client)
-        return DeviceId(**db_device.dump_id())
 
     @safe_and_metered_rpc_method(METRICS, LOGGER)
     def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
         device_uuid = request.device_uuid.uuid
 
-        self.monitoring_loops.remove(device_uuid)
+        self.mutex_queues.wait_my_turn(device_uuid)
+        try:
+            self.monitoring_loops.remove(device_uuid)
 
-        sync_device_from_context(device_uuid, self.context_client, self.database)
-        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
-        if db_device is None: return Empty()
+            sync_device_from_context(device_uuid, self.context_client, self.database)
+            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
+            if db_device is None: return Empty()
 
-        self.driver_instance_cache.delete(device_uuid)
-        delete_device_from_context(db_device, self.context_client)
+            self.driver_instance_cache.delete(device_uuid)
+            delete_device_from_context(db_device, self.context_client)
 
-        for db_kpi_pk,_ in db_device.references(KpiModel):
-            db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
-            for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
-                get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
-            db_kpi.delete()
+            for db_kpi_pk,_ in db_device.references(KpiModel):
+                db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
+                for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
+                    get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
+                db_kpi.delete()
 
-        for db_endpoint_pk,_ in db_device.references(EndPointModel):
-            db_endpoint = EndPointModel(self.database, db_endpoint_pk)
-            for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
-                get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
-            db_endpoint.delete()
+            for db_endpoint_pk,_ in db_device.references(EndPointModel):
+                db_endpoint = EndPointModel(self.database, db_endpoint_pk)
+                for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
+                    get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
+                db_endpoint.delete()
 
-        for db_driver_pk,_ in db_device.references(DriverModel):
-            get_object(self.database, DriverModel, db_driver_pk).delete()
+            for db_driver_pk,_ in db_device.references(DriverModel):
+                get_object(self.database, DriverModel, db_driver_pk).delete()
 
-        db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
-        for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
-            get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
+            db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
+            for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
+                get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
 
-        db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
-        for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
-            get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
+            db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
+            for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
+                get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
 
-        db_device.delete()
-        db_initial_config.delete()
-        db_running_config.delete()
-        return Empty()
+            db_device.delete()
+            db_initial_config.delete()
+            db_running_config.delete()
+            return Empty()
+        finally:
+            self.mutex_queues.signal_done(device_uuid)
 
     @safe_and_metered_rpc_method(METRICS, LOGGER)
     def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
         device_uuid = request.device_uuid.uuid
 
-        sync_device_from_context(device_uuid, self.context_client, self.database)
-        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
+        self.mutex_queues.wait_my_turn(device_uuid)
+        try:
+            sync_device_from_context(device_uuid, self.context_client, self.database)
+            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
 
-        config_rules = {} if db_device is None else db_device.dump_initial_config()
-        return DeviceConfig(config_rules=config_rules)
+            config_rules = {} if db_device is None else db_device.dump_initial_config()
+            device_config = DeviceConfig(config_rules=config_rules)
+            return device_config
+        finally:
+            self.mutex_queues.signal_done(device_uuid)
 
     @safe_and_metered_rpc_method(METRICS, LOGGER)
     def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
         kpi_uuid = request.kpi_id.kpi_id.uuid
+        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
+        self.mutex_queues.wait_my_turn(device_uuid)
+        try:
+            subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
+            if subscribe:
+                db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
+                if db_device is None:
+                    msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                endpoint_id = request.kpi_descriptor.endpoint_id
+                endpoint_uuid = endpoint_id.endpoint_uuid.uuid
+                str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
+                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
+                endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
+                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
+                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
+                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
+                db_endpoint : EndPointModel = get_object(
+                    self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
+                if db_endpoint is None:
+                    msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
+                        str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                driver : _Driver = self.driver_instance_cache.get(device_uuid)
+                if driver is None:
+                    msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                sample_type = request.kpi_descriptor.kpi_sample_type
+
+                attributes = {
+                    'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
+                    'kpi_description'  : request.kpi_descriptor.kpi_description,
+                    'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(sample_type),
+                    'device_fk'        : db_device,
+                    'endpoint_fk'      : db_endpoint,
+                    'sampling_duration': request.sampling_duration_s,
+                    'sampling_interval': request.sampling_interval_s,
+                }
+                result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
+                db_kpi, updated = result
+
+                str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
+                db_endpoint_monitor : EndPointMonitorModel = get_object(
+                    self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
+                if db_endpoint_monitor is None:
+                    msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
+                        str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
+                        str(device_uuid), str(endpoint_uuid))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
+                str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
+                attributes = {
+                    'endpoint_monitor_fk': db_endpoint_monitor,
+                    'kpi_fk'             : db_kpi,
+                }
+                result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
+                    self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
+                db_endpoint_monitor_kpi, updated = result
+
+                resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
+                resources_to_subscribe.append(
+                    (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
+                results_subscribestate = driver.SubscribeState(resources_to_subscribe)
+                errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
+                if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
+
+                self.monitoring_loops.add(device_uuid, driver)
 
-        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
-        if subscribe:
-            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
-
-            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
-            if db_device is None:
-                msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            endpoint_id = request.kpi_descriptor.endpoint_id
-            endpoint_uuid = endpoint_id.endpoint_uuid.uuid
-            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
-            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
-            endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
-            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
-                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
-                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
-            db_endpoint : EndPointModel = get_object(
-                self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
-            if db_endpoint is None:
-                msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
-                    str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            driver : _Driver = self.driver_instance_cache.get(device_uuid)
-            if driver is None:
-                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            sample_type = request.kpi_descriptor.kpi_sample_type
-
-            attributes = {
-                'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
-                'kpi_description'  : request.kpi_descriptor.kpi_description,
-                'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(sample_type),
-                'device_fk'        : db_device,
-                'endpoint_fk'      : db_endpoint,
-                'sampling_duration': request.sampling_duration_s,
-                'sampling_interval': request.sampling_interval_s,
-            }
-            result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
-            db_kpi, updated = result
-
-            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
-            db_endpoint_monitor : EndPointMonitorModel = get_object(
-                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
-            if db_endpoint_monitor is None:
-                msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
-                    str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
-                    str(device_uuid), str(endpoint_uuid))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
-            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
-            attributes = {
-                'endpoint_monitor_fk': db_endpoint_monitor,
-                'kpi_fk'             : db_kpi,
-            }
-            result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
-                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
-            db_endpoint_monitor_kpi, updated = result
-
-            resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
-            resources_to_subscribe.append(
-                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
-            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
-            errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
-            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
-
-            self.monitoring_loops.add(device_uuid, driver)
-
-        else:
-            db_kpi : KpiModel = get_object(
-                self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
-            if db_kpi is None:
-                msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            db_device : DeviceModel = get_object(
-                self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
-            if db_device is None:
-                msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-            device_uuid = db_device.device_uuid
-
-            db_endpoint : EndPointModel = get_object(
-                self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
-            if db_endpoint is None:
-                msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-            endpoint_uuid = db_endpoint.endpoint_uuid
-            str_endpoint_key = db_endpoint.pk
-
-            kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
-            sample_type = kpi_sample_type.value
-            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
-            db_endpoint_monitor : EndPointMonitorModel = get_object(
-                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
-            if db_endpoint_monitor is None:
-                msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
-            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
-            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
-                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
-            if db_endpoint_monitor_kpi is None:
-                msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
-            resources_to_unsubscribe.append(
-                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
-
-            driver : _Driver = self.driver_instance_cache.get(device_uuid)
-            if driver is None:
-                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
-                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
-            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
-            errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
-            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
-
-            db_endpoint_monitor_kpi.delete()
-            db_kpi.delete()
-
-            # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
-            # requests.
-            #self.monitoring_loops.remove(device_uuid)
-
-        # Subscriptions are not stored as classical driver config.
-        # TODO: consider adding it somehow in the configuration.
-        # Warning: GetConfig might be very slow in OpenConfig devices
-        #running_config_rules = [
-        #    (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
-        #    for config_rule in driver.GetConfig()
-        #]
-        #context_config_rules = {
-        #    config_rule[1]: config_rule[2]
-        #    for config_rule in get_config_rules(self.database, device_uuid, 'running')
-        #}
-
-        ## each in context, not in running => delete in context
-        ## each in running, not in context => add to context
-        ## each in context and in running, context.value != running.value => update in context
-        #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
-        #for config_rule_key,config_rule_value in running_config_rules:
-        #    running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
-        #    context_config_rules.pop(config_rule_key, None)
-        #for context_rule_key,context_rule_value in context_config_rules.items():
-        #    running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))
-
-        ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
-        ##for i,running_config_rules_action in enumerate(running_config_rules_actions):
-        ##    LOGGER.info(msg.format(i, str(running_config_rules_action)))
-        #update_config(self.database, device_uuid, 'running', running_config_rules_actions)
-
-        sync_device_to_context(db_device, self.context_client)
-        return Empty()
+            else:
+                db_kpi : KpiModel = get_object(
+                    self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
+                if db_kpi is None:
+                    msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                db_device : DeviceModel = get_object(
+                    self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
+                if db_device is None:
+                    msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+                device_uuid = db_device.device_uuid
+
+                db_endpoint : EndPointModel = get_object(
+                    self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
+                if db_endpoint is None:
+                    msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+                endpoint_uuid = db_endpoint.endpoint_uuid
+                str_endpoint_key = db_endpoint.pk
+
+                kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
+                sample_type = kpi_sample_type.value
+                str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
+                db_endpoint_monitor : EndPointMonitorModel = get_object(
+                    self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
+                if db_endpoint_monitor is None:
+                    msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
+                str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
+                db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
+                    self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
+                if db_endpoint_monitor_kpi is None:
+                    msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
+                resources_to_unsubscribe.append(
+                    (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
+
+                driver : _Driver = self.driver_instance_cache.get(device_uuid)
+                if driver is None:
+                    msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
+                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+                results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
+                errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
+                if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
+
+                db_endpoint_monitor_kpi.delete()
+                db_kpi.delete()
+
+                # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
+                # requests.
+                #self.monitoring_loops.remove(device_uuid)
+
+            # Subscriptions are not stored as classical driver config.
+            # TODO: consider adding it somehow in the configuration.
+            # Warning: GetConfig might be very slow in OpenConfig devices
+            #running_config_rules = [
+            #    (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
+            #    for config_rule in driver.GetConfig()
+            #]
+            #context_config_rules = {
+            #    config_rule[1]: config_rule[2]
+            #    for config_rule in get_config_rules(self.database, device_uuid, 'running')
+            #}
+
+            ## each in context, not in running => delete in context
+            ## each in running, not in context => add to context
+            ## each in context and in running, context.value != running.value => update in context
+            #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
+            #for config_rule_key,config_rule_value in running_config_rules:
+            #    running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
+            #    context_config_rules.pop(config_rule_key, None)
+            #for context_rule_key,context_rule_value in context_config_rules.items():
+            #    running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))
+
+            ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
+            ##for i,running_config_rules_action in enumerate(running_config_rules_actions):
+            ##    LOGGER.info(msg.format(i, str(running_config_rules_action)))
+            #update_config(self.database, device_uuid, 'running', running_config_rules_actions)
+
+            sync_device_to_context(db_device, self.context_client)
+            return Empty()
+        finally:
+            self.mutex_queues.signal_done(device_uuid)
-- 
GitLab