Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 26.5 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, re
from typing import Any, Dict, List, Tuple
from common.orm.Database import Database
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.device_pb2_grpc import DeviceServiceServicer
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
from .database.ConfigModel import (
    ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
from .MonitoringLoops import MonitoringLoops
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops
    ) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.context_client = ContextClient()
        self.database = database
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues = MutexQueues()
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

        connection_config_rules = {}
        unexpected_config_rules = []
        for config_rule in request.device_config.config_rules:
            if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
               (config_rule.WhichOneof('config_rule') == 'custom') and \
               (config_rule.custom.resource_key.startswith('_connect/')):
                connection_config_rules[
                    config_rule.custom.resource_key.replace('_connect/', '')
                ] = config_rule.custom.resource_value
            else:
                unexpected_config_rules.append(config_rule)
        if len(unexpected_config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.device_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            unexpected_config_rules = list(filter(
                lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules,
                unexpected_config_rules))
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'device.device_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
                              'with "_connect/" tag. Others should be configured after adding the device.')

        if len(request.device_endpoints) > 0:
            unexpected_endpoints = []
            for device_endpoint in request.device_endpoints:
                unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'device.device_endpoints', str_unexpected_endpoints,
                extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
                              'interrogation of the physical device.')

        # Remove device configuration
        json_request = grpc_message_to_json(request, use_integers_for_enums=True)
        json_request['device_config'] = {}
        request = Device(**json_request)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            sync_device_from_context(device_uuid, self.context_client, self.database)
            db_device,_ = update_device_in_local_database(self.database, request)

            driver_filter_fields = get_device_driver_filter_fields(db_device)

            #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
            address  = connection_config_rules.pop('address', None)
            port     = connection_config_rules.pop('port', None)
            settings = connection_config_rules.pop('settings', '{}')
            try:
                settings = json.loads(settings)
            except ValueError as e:
                raise InvalidArgumentException(
                    'device.device_config.config_rules[settings]', settings,
                    extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
            driver : _Driver = self.driver_instance_cache.get(
                device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
            driver.Connect()

            endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
            try:
                for resource_key, resource_value in endpoints:
                    if isinstance(resource_value, Exception):
                        LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
                        continue
                    endpoint_uuid = resource_value.get('uuid')
                    endpoint_type = resource_value.get('type')
                    str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
                    db_endpoint, _ = update_or_create_object(
                        self.database, EndPointModel, str_endpoint_key, {
                        'device_fk'    : db_device,
                        'endpoint_uuid': endpoint_uuid,
                        'endpoint_type': endpoint_type,
                        'resource_key' : resource_key,
                    })
                    sample_types : Dict[int, str] = resource_value.get('sample_types', {})
                    for sample_type, monitor_resource_key in sample_types.items():
                        str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
                        update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
                            'endpoint_fk'    : db_endpoint,
                            'resource_key'   : monitor_resource_key,
                            'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
                        })
            except: # pylint: disable=bare-except
                LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))

            raw_running_config_rules = driver.GetConfig()
            running_config_rules = []
            for resource_key, resource_value in raw_running_config_rules:
                if isinstance(resource_value, Exception):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    msg = 'Error retrieving config rules: {:s} => {:s}'
                    LOGGER.error(msg.format(str(resource_key), str(resource_value)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
                running_config_rules.append(config_rule)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            #for running_config_rule in running_config_rules:
            #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
            update_config(self.database, device_uuid, 'running', running_config_rules)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            initial_config_rules = driver.GetInitialConfig()
            update_config(self.database, device_uuid, 'initial', initial_config_rules)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
            #    include_config_rules=True, include_drivers=True, include_endpoints=True))))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            sync_device_to_context(db_device, self.context_client)
            return DeviceId(**db_device.dump_id())
        finally:
            self.mutex_queues.signal_done(device_uuid)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
            sync_device_from_context(device_uuid, self.context_client, self.database)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_config_rules = get_config_rules(self.database, device_uuid, 'running')
            context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
            #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            db_device,_ = update_device_in_local_database(self.database, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
            #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            resources_to_set    : List[Tuple[str, Any]] = [] # key, value
            resources_to_delete : List[Tuple[str, Any]] = [] # key, value
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for config_rule in request_config_rules:
                action, key, value = config_rule
                if action == ORM_ConfigActionEnum.SET:
                    if (key not in context_config_rules) or (context_config_rules[key] != value):
                        resources_to_set.append((key, value))
                elif action == ORM_ConfigActionEnum.DELETE:
                    if key in context_config_rules:
                        resources_to_delete.append((key, value))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
            #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: use of datastores (might be virtual ones) to enable rollbacks
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors = []
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))

            if len(errors) == 0:
                results_setconfig = driver.SetConfig(resources_to_set)
                errors.extend(check_set_errors(resources_to_set, results_setconfig))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) == 0:
                results_deleteconfig = driver.DeleteConfig(resources_to_delete)
                errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
                raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            running_config_rules = driver.GetConfig()
            running_config_rules = [
                (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
                for config_rule in running_config_rules if not isinstance(config_rule[1], Exception)
            ]
            #for running_config_rule in running_config_rules:
            #    LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
            update_config(self.database, device_uuid, 'running', running_config_rules)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            sync_device_to_context(db_device, self.context_client)
            return DeviceId(**db_device.dump_id())
        finally:
            self.mutex_queues.signal_done(device_uuid)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
            self.monitoring_loops.remove(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            sync_device_from_context(device_uuid, self.context_client, self.database)
            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
            if db_device is None: return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.driver_instance_cache.delete(device_uuid)
            delete_device_from_context(db_device, self.context_client)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for db_kpi_pk,_ in db_device.references(KpiModel):
                db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
                for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
                    get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
                db_kpi.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for db_endpoint_pk,_ in db_device.references(EndPointModel):
                db_endpoint = EndPointModel(self.database, db_endpoint_pk)
                for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
                    get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
                db_endpoint.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for db_driver_pk,_ in db_device.references(DriverModel):
                get_object(self.database, DriverModel, db_driver_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
            for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
                get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
            for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
                get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            db_device.delete()
            db_initial_config.delete()
            db_running_config.delete()
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
            sync_device_from_context(device_uuid, self.context_client, self.database)
            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            config_rules = {} if db_device is None else db_device.dump_initial_config()
            device_config = DeviceConfig(config_rules=config_rules)
            return device_config
        finally:
            self.mutex_queues.signal_done(device_uuid)
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
        kpi_uuid = request.kpi_id.kpi_id.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
            subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
            if subscribe:
                db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
                if db_device is None:
                    msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                endpoint_id = request.kpi_descriptor.endpoint_id
                endpoint_uuid = endpoint_id.endpoint_uuid.uuid
                str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
                db_endpoint : EndPointModel = get_object(
                    self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
                if db_endpoint is None:
                    msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
                        str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                driver : _Driver = self.driver_instance_cache.get(device_uuid)
                if driver is None:
                    msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                sample_type = request.kpi_descriptor.kpi_sample_type

                attributes = {
                    'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
                    'kpi_description'  : request.kpi_descriptor.kpi_description,
                    'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(sample_type),
                    'device_fk'        : db_device,
                    'endpoint_fk'      : db_endpoint,
                    'sampling_duration': request.sampling_duration_s,
                    'sampling_interval': request.sampling_interval_s,
                }
                result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
                db_kpi, updated = result

                str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
                db_endpoint_monitor : EndPointMonitorModel = get_object(
                    self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
                if db_endpoint_monitor is None:
                    msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
                        str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
                        str(device_uuid), str(endpoint_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
                str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
                attributes = {
                    'endpoint_monitor_fk': db_endpoint_monitor,
                    'kpi_fk'             : db_kpi,
                }
                result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
                    self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
                db_endpoint_monitor_kpi, updated = result

                resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
                resources_to_subscribe.append(
                    (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
                results_subscribestate = driver.SubscribeState(resources_to_subscribe)
                errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
                if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

                self.monitoring_loops.add(device_uuid, driver)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            else:
                db_kpi : KpiModel = get_object(
                    self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
                if db_kpi is None:
                    msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                db_device : DeviceModel = get_object(
                    self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
                if db_device is None:
                    msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
                device_uuid = db_device.device_uuid

                db_endpoint : EndPointModel = get_object(
                    self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
                if db_endpoint is None:
                    msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
                endpoint_uuid = db_endpoint.endpoint_uuid
                str_endpoint_key = db_endpoint.pk

                kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
                sample_type = kpi_sample_type.value
                str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
                db_endpoint_monitor : EndPointMonitorModel = get_object(
                    self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
                if db_endpoint_monitor is None:
                    msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
                str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
                db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                    self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
                if db_endpoint_monitor_kpi is None:
                    msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
                resources_to_unsubscribe.append(
                    (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))

                driver : _Driver = self.driver_instance_cache.get(device_uuid)
                if driver is None:
                    msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

                results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
                errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
                if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

                db_endpoint_monitor_kpi.delete()
                db_kpi.delete()

                # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
                # requests.
                #self.monitoring_loops.remove(device_uuid)

            # Subscriptions are not stored as classical driver config.
            # TODO: consider adding it somehow in the configuration.
            # Warning: GetConfig might be very slow in OpenConfig devices
            #running_config_rules = [
            #    (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
            #    for config_rule in driver.GetConfig()
            #]
            #context_config_rules = {
            #    config_rule[1]: config_rule[2]
            #    for config_rule in get_config_rules(self.database, device_uuid, 'running')
            #}

            ## each in context, not in running => delete in context
            ## each in running, not in context => add to context
            ## each in context and in running, context.value != running.value => update in context
            #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
            #for config_rule_key,config_rule_value in running_config_rules:
            #    running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
            #    context_config_rules.pop(config_rule_key, None)
            #for context_rule_key,context_rule_value in context_config_rules.items():
            #    running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))

            ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
            ##for i,running_config_rules_action in enumerate(running_config_rules_actions):
            ##    LOGGER.info(msg.format(i, str(running_config_rules_action)))
            #update_config(self.database, device_uuid, 'running', running_config_rules_actions)

            sync_device_to_context(db_device, self.context_client)
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)