diff --git a/src/common/tools/mutex_queues/MutexQueues.py b/src/common/tools/mutex_queues/MutexQueues.py new file mode 100644 index 0000000000000000000000000000000000000000..c3ab760f281c73ae2f308044d67b2d2b81aef142 --- /dev/null +++ b/src/common/tools/mutex_queues/MutexQueues.py @@ -0,0 +1,78 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# MutexQueues: +# ------------ +# This class enables to schedule and serialize operations concurrently issued +# over a number of resources. For instance, when multiple components want to +# configure devices through the Device component, configuration operations +# have to be serialized to prevent data corruptions, and race conditions, etc. +# Usage Example: +# class Servicer(): +# def __init__(self): +# # init other stuff +# self.drivers = dict() +# self.mutex_queues = MutexQueues() +# +# def configure_device(self, device_uuid, settings): +# self.mutex_queues.wait_my_turn(device_uuid) +# driver = self.drivers.get(device_uuid) +# if driver is None: +# driver = Driver(device_uuid) +# self.drivers[device_uuid] = driver +# driver.configure(settings) +# self.mutex_queues.signal_done(device_uuid) + +import threading +from queue import Queue +from typing import Dict + +class MutexQueues: + def __init__(self) -> None: + # lock to protect dictionary updates + self.lock = threading.Lock() + + # dictionaty of queues of mutexes: queue_name => queue[mutex] + # first mutex is the running one + self.mutex_queues : Dict[str, Queue[threading.Event]] = dict() + + def wait_my_turn(self, queue_name : str) -> None: + # create my mutex and enqueue it + mutex = threading.Event() + with self.lock: + queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) + first_in_queue = (queue.qsize() == 0) + queue.put_nowait(mutex) + + # if I'm the first in the queue upon addition, means there are no running tasks + # directly return without waiting + if first_in_queue: return + + # otherwise, wait for my turn in the queue + mutex.wait() + + def signal_done(self, queue_name : str) -> None: + # I'm done with my work + with self.lock: + queue : Queue = self.mutex_queues.setdefault(queue_name, Queue()) + + # remove muself from the queue + queue.get_nowait() + + # if there are no other tasks queued, return + if queue.qsize() == 0: return + + # otherwise, signal the next task in the queue to start + next_mutex : threading.Event = queue.queue[0] + next_mutex.set() diff --git a/src/common/tools/mutex_queues/__init__.py b/src/common/tools/mutex_queues/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..70a33251242c51f49140e596b8208a19dd5245f7 --- /dev/null +++ b/src/common/tools/mutex_queues/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index 4dc2b01000d8ca6dd2b3ecee0b0f867338636c73..59134f26d3dd8c3fa0a9dddbcd1d3df298ec076a 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -23,10 +23,15 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache from .DeviceServiceServicerImpl import DeviceServiceServicerImpl from .MonitoringLoops import MonitoringLoops +# Custom gRPC settings +# Multiple clients might keep connections alive waiting for RPC methods to be executed. +# Requests needs to be serialized to ensure correct device configurations +GRPC_MAX_WORKERS = 200 + class DeviceService(GenericGrpcService): def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.DEVICE) - super().__init__(port, cls_name=cls_name) + super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name) database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) self.monitoring_loops = MonitoringLoops(database) self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops) diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 9ffd028a67a34cfcce7a737a5817128126941759..d5d44f34ffb69a337b715a0884aea3770b3d3cec 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -24,6 +24,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException from common.tools.grpc.Tools import grpc_message_to_json +from common.tools.mutex_queues.MutexQueues import MutexQueues from context.client.ContextClient import ContextClient from .database.ConfigModel import ( ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config) @@ -56,6 +57,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): self.database = database self.driver_instance_cache = driver_instance_cache self.monitoring_loops = monitoring_loops + self.mutex_queues = MutexQueues() LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS, LOGGER) @@ -101,348 +103,368 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): json_request['device_config'] = {} request = Device(**json_request) - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device,_ = update_device_in_local_database(self.database, request) - - driver_filter_fields = get_device_driver_filter_fields(db_device) - - #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules))) - address = connection_config_rules.pop('address', None) - port = connection_config_rules.pop('port', None) - settings = connection_config_rules.pop('settings', '{}') + self.mutex_queues.wait_my_turn(device_uuid) try: - settings = json.loads(settings) - except ValueError as e: - raise InvalidArgumentException( - 'device.device_config.config_rules[settings]', settings, - extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e - driver : _Driver = self.driver_instance_cache.get( - device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) - driver.Connect() - - endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) - try: - for resource_key, resource_value in endpoints: + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device,_ = update_device_in_local_database(self.database, request) + + driver_filter_fields = get_device_driver_filter_fields(db_device) + + #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules))) + address = connection_config_rules.pop('address', None) + port = connection_config_rules.pop('port', None) + settings = connection_config_rules.pop('settings', '{}') + try: + settings = json.loads(settings) + except ValueError as e: + raise InvalidArgumentException( + 'device.device_config.config_rules[settings]', settings, + extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e + driver : _Driver = self.driver_instance_cache.get( + device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) + driver.Connect() + + endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) + try: + for resource_key, resource_value in endpoints: + if isinstance(resource_value, Exception): + LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value))) + continue + endpoint_uuid = resource_value.get('uuid') + endpoint_type = resource_value.get('type') + str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) + db_endpoint, _ = update_or_create_object( + self.database, EndPointModel, str_endpoint_key, { + 'device_fk' : db_device, + 'endpoint_uuid': endpoint_uuid, + 'endpoint_type': endpoint_type, + 'resource_key' : resource_key, + }) + sample_types : Dict[int, str] = resource_value.get('sample_types', {}) + for sample_type, monitor_resource_key in sample_types.items(): + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, { + 'endpoint_fk' : db_endpoint, + 'resource_key' : monitor_resource_key, + 'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type), + }) + except: # pylint: disable=bare-except + LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints))) + + raw_running_config_rules = driver.GetConfig() + running_config_rules = [] + for resource_key, resource_value in raw_running_config_rules: if isinstance(resource_value, Exception): - LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value))) + msg = 'Error retrieving config rules: {:s} => {:s}' + LOGGER.error(msg.format(str(resource_key), str(resource_value))) continue - endpoint_uuid = resource_value.get('uuid') - endpoint_type = resource_value.get('type') - str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - db_endpoint, _ = update_or_create_object( - self.database, EndPointModel, str_endpoint_key, { - 'device_fk' : db_device, - 'endpoint_uuid': endpoint_uuid, - 'endpoint_type': endpoint_type, - 'resource_key' : resource_key, - }) - sample_types : Dict[int, str] = resource_value.get('sample_types', {}) - for sample_type, monitor_resource_key in sample_types.items(): - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, { - 'endpoint_fk' : db_endpoint, - 'resource_key' : monitor_resource_key, - 'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type), - }) - except: # pylint: disable=bare-except - LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints))) - - raw_running_config_rules = driver.GetConfig() - running_config_rules = [] - for resource_key, resource_value in raw_running_config_rules: - if isinstance(resource_value, Exception): - msg = 'Error retrieving config rules: {:s} => {:s}' - LOGGER.error(msg.format(str(resource_key), str(resource_value))) - continue - config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True)) - running_config_rules.append(config_rule) + config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True)) + running_config_rules.append(config_rule) - #for running_config_rule in running_config_rules: - # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) - update_config(self.database, device_uuid, 'running', running_config_rules) + #for running_config_rule in running_config_rules: + # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) + update_config(self.database, device_uuid, 'running', running_config_rules) - initial_config_rules = driver.GetInitialConfig() - update_config(self.database, device_uuid, 'initial', initial_config_rules) + initial_config_rules = driver.GetInitialConfig() + update_config(self.database, device_uuid, 'initial', initial_config_rules) - #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump( - # include_config_rules=True, include_drivers=True, include_endpoints=True)))) + #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump( + # include_config_rules=True, include_drivers=True, include_endpoints=True)))) - sync_device_to_context(db_device, self.context_client) - return DeviceId(**db_device.dump_id()) + sync_device_to_context(db_device, self.context_client) + return DeviceId(**db_device.dump_id()) + finally: + self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS, LOGGER) def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid - sync_device_from_context(device_uuid, self.context_client, self.database) + self.mutex_queues.wait_my_turn(device_uuid) + try: + sync_device_from_context(device_uuid, self.context_client, self.database) - context_config_rules = get_config_rules(self.database, device_uuid, 'running') - context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} - #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) + context_config_rules = get_config_rules(self.database, device_uuid, 'running') + context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} + #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) - db_device,_ = update_device_in_local_database(self.database, request) + db_device,_ = update_device_in_local_database(self.database, request) - request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) - #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) + request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) + #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) - resources_to_set : List[Tuple[str, Any]] = [] # key, value - resources_to_delete : List[Tuple[str, Any]] = [] # key, value + resources_to_set : List[Tuple[str, Any]] = [] # key, value + resources_to_delete : List[Tuple[str, Any]] = [] # key, value - for config_rule in request_config_rules: - action, key, value = config_rule - if action == ORM_ConfigActionEnum.SET: - if (key not in context_config_rules) or (context_config_rules[key] != value): - resources_to_set.append((key, value)) - elif action == ORM_ConfigActionEnum.DELETE: - if key in context_config_rules: - resources_to_delete.append((key, value)) + for config_rule in request_config_rules: + action, key, value = config_rule + if action == ORM_ConfigActionEnum.SET: + if (key not in context_config_rules) or (context_config_rules[key] != value): + resources_to_set.append((key, value)) + elif action == ORM_ConfigActionEnum.DELETE: + if key in context_config_rules: + resources_to_delete.append((key, value)) - #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) - #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) + #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) + #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) - # TODO: use of datastores (might be virtual ones) to enable rollbacks + # TODO: use of datastores (might be virtual ones) to enable rollbacks - errors = [] + errors = [] - driver : _Driver = self.driver_instance_cache.get(device_uuid) - if driver is None: - errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) + + if len(errors) == 0: + results_setconfig = driver.SetConfig(resources_to_set) + errors.extend(check_set_errors(resources_to_set, results_setconfig)) - if len(errors) == 0: - results_setconfig = driver.SetConfig(resources_to_set) - errors.extend(check_set_errors(resources_to_set, results_setconfig)) + if len(errors) == 0: + results_deleteconfig = driver.DeleteConfig(resources_to_delete) + errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) - if len(errors) == 0: - results_deleteconfig = driver.DeleteConfig(resources_to_delete) - errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) + if len(errors) > 0: + raise OperationFailedException('ConfigureDevice', extra_details=errors) - if len(errors) > 0: - raise OperationFailedException('ConfigureDevice', extra_details=errors) + running_config_rules = driver.GetConfig() + running_config_rules = [ + (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) + for config_rule in running_config_rules if not isinstance(config_rule[1], Exception) + ] + #for running_config_rule in running_config_rules: + # LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule))) + update_config(self.database, device_uuid, 'running', running_config_rules) - running_config_rules = driver.GetConfig() - running_config_rules = [ - (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) - for config_rule in running_config_rules if not isinstance(config_rule[1], Exception) - ] - #for running_config_rule in running_config_rules: - # LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule))) - update_config(self.database, device_uuid, 'running', running_config_rules) + sync_device_to_context(db_device, self.context_client) + return DeviceId(**db_device.dump_id()) + finally: + self.mutex_queues.signal_done(device_uuid) - sync_device_to_context(db_device, self.context_client) - return DeviceId(**db_device.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid - self.monitoring_loops.remove(device_uuid) + self.mutex_queues.wait_my_turn(device_uuid) + try: + self.monitoring_loops.remove(device_uuid) - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) - if db_device is None: return Empty() + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + if db_device is None: return Empty() - self.driver_instance_cache.delete(device_uuid) - delete_device_from_context(db_device, self.context_client) + self.driver_instance_cache.delete(device_uuid) + delete_device_from_context(db_device, self.context_client) - for db_kpi_pk,_ in db_device.references(KpiModel): - db_kpi = get_object(self.database, KpiModel, db_kpi_pk) - for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel): - get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete() - db_kpi.delete() + for db_kpi_pk,_ in db_device.references(KpiModel): + db_kpi = get_object(self.database, KpiModel, db_kpi_pk) + for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel): + get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete() + db_kpi.delete() - for db_endpoint_pk,_ in db_device.references(EndPointModel): - db_endpoint = EndPointModel(self.database, db_endpoint_pk) - for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel): - get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete() - db_endpoint.delete() + for db_endpoint_pk,_ in db_device.references(EndPointModel): + db_endpoint = EndPointModel(self.database, db_endpoint_pk) + for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel): + get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete() + db_endpoint.delete() - for db_driver_pk,_ in db_device.references(DriverModel): - get_object(self.database, DriverModel, db_driver_pk).delete() + for db_driver_pk,_ in db_device.references(DriverModel): + get_object(self.database, DriverModel, db_driver_pk).delete() - db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) - for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): - get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() + db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) + for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): + get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() - db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) - for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): - get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() + db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) + for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): + get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() - db_device.delete() - db_initial_config.delete() - db_running_config.delete() - return Empty() + db_device.delete() + db_initial_config.delete() + db_running_config.delete() + return Empty() + finally: + self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS, LOGGER) def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig: device_uuid = request.device_uuid.uuid - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + self.mutex_queues.wait_my_turn(device_uuid) + try: + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) - config_rules = {} if db_device is None else db_device.dump_initial_config() - return DeviceConfig(config_rules=config_rules) + config_rules = {} if db_device is None else db_device.dump_initial_config() + device_config = DeviceConfig(config_rules=config_rules) + return device_config + finally: + self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS, LOGGER) def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: kpi_uuid = request.kpi_id.kpi_id.uuid + device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + self.mutex_queues.wait_my_turn(device_uuid) + try: + subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) + if subscribe: + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + if db_device is None: + msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + endpoint_id = request.kpi_descriptor.endpoint_id + endpoint_uuid = endpoint_id.endpoint_uuid.uuid + str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) + endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid + endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid + if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: + str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) + str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') + db_endpoint : EndPointModel = get_object( + self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) + if db_endpoint is None: + msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format( + str(device_uuid), str(endpoint_uuid), str(str_endpoint_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + sample_type = request.kpi_descriptor.kpi_sample_type + + attributes = { + 'kpi_uuid' : request.kpi_id.kpi_id.uuid, + 'kpi_description' : request.kpi_descriptor.kpi_description, + 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type), + 'device_fk' : db_device, + 'endpoint_fk' : db_endpoint, + 'sampling_duration': request.sampling_duration_s, + 'sampling_interval': request.sampling_interval_s, + } + result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes) + db_kpi, updated = result + + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + db_endpoint_monitor : EndPointMonitorModel = get_object( + self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) + if db_endpoint_monitor is None: + msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format( + str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')), + str(device_uuid), str(endpoint_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') + attributes = { + 'endpoint_monitor_fk': db_endpoint_monitor, + 'kpi_fk' : db_kpi, + } + result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object( + self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes) + db_endpoint_monitor_kpi, updated = result + + resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval + resources_to_subscribe.append( + (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) + results_subscribestate = driver.SubscribeState(resources_to_subscribe) + errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate) + if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) + + self.monitoring_loops.add(device_uuid, driver) - subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) - if subscribe: - device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid - - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) - if db_device is None: - msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_id = request.kpi_descriptor.endpoint_id - endpoint_uuid = endpoint_id.endpoint_uuid.uuid - str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid - endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid - if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: - str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) - str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') - db_endpoint : EndPointModel = get_object( - self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) - if db_endpoint is None: - msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format( - str(device_uuid), str(endpoint_uuid), str(str_endpoint_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - driver : _Driver = self.driver_instance_cache.get(device_uuid) - if driver is None: - msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - sample_type = request.kpi_descriptor.kpi_sample_type - - attributes = { - 'kpi_uuid' : request.kpi_id.kpi_id.uuid, - 'kpi_description' : request.kpi_descriptor.kpi_description, - 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type), - 'device_fk' : db_device, - 'endpoint_fk' : db_endpoint, - 'sampling_duration': request.sampling_duration_s, - 'sampling_interval': request.sampling_interval_s, - } - result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes) - db_kpi, updated = result - - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - db_endpoint_monitor : EndPointMonitorModel = get_object( - self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) - if db_endpoint_monitor is None: - msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format( - str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')), - str(device_uuid), str(endpoint_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') - attributes = { - 'endpoint_monitor_fk': db_endpoint_monitor, - 'kpi_fk' : db_kpi, - } - result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object( - self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes) - db_endpoint_monitor_kpi, updated = result - - resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval - resources_to_subscribe.append( - (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) - results_subscribestate = driver.SubscribeState(resources_to_subscribe) - errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate) - if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) - - self.monitoring_loops.add(device_uuid, driver) - - else: - db_kpi : KpiModel = get_object( - self.database, KpiModel, kpi_uuid, raise_if_not_found=False) - if db_kpi is None: - msg = 'Kpi({:s}) not found'.format(str(kpi_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - db_device : DeviceModel = get_object( - self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False) - if db_device is None: - msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - device_uuid = db_device.device_uuid - - db_endpoint : EndPointModel = get_object( - self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False) - if db_endpoint is None: - msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - endpoint_uuid = db_endpoint.endpoint_uuid - str_endpoint_key = db_endpoint.pk - - kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type - sample_type = kpi_sample_type.value - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - db_endpoint_monitor : EndPointMonitorModel = get_object( - self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) - if db_endpoint_monitor is None: - msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') - db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( - self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) - if db_endpoint_monitor_kpi is None: - msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval - resources_to_unsubscribe.append( - (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) - - driver : _Driver = self.driver_instance_cache.get(device_uuid) - if driver is None: - msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) - errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate) - if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) - - db_endpoint_monitor_kpi.delete() - db_kpi.delete() - - # There is one monitoring loop per device; keep them active since they are re-used by different monitoring - # requests. - #self.monitoring_loops.remove(device_uuid) - - # Subscriptions are not stored as classical driver config. - # TODO: consider adding it somehow in the configuration. - # Warning: GetConfig might be very slow in OpenConfig devices - #running_config_rules = [ - # (config_rule[0], json.dumps(config_rule[1], sort_keys=True)) - # for config_rule in driver.GetConfig() - #] - #context_config_rules = { - # config_rule[1]: config_rule[2] - # for config_rule in get_config_rules(self.database, device_uuid, 'running') - #} - - ## each in context, not in running => delete in context - ## each in running, not in context => add to context - ## each in context and in running, context.value != running.value => update in context - #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = [] - #for config_rule_key,config_rule_value in running_config_rules: - # running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value)) - # context_config_rules.pop(config_rule_key, None) - #for context_rule_key,context_rule_value in context_config_rules.items(): - # running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value)) - - ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}' - ##for i,running_config_rules_action in enumerate(running_config_rules_actions): - ## LOGGER.info(msg.format(i, str(running_config_rules_action))) - #update_config(self.database, device_uuid, 'running', running_config_rules_actions) - - sync_device_to_context(db_device, self.context_client) - return Empty() + else: + db_kpi : KpiModel = get_object( + self.database, KpiModel, kpi_uuid, raise_if_not_found=False) + if db_kpi is None: + msg = 'Kpi({:s}) not found'.format(str(kpi_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + db_device : DeviceModel = get_object( + self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False) + if db_device is None: + msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + device_uuid = db_device.device_uuid + + db_endpoint : EndPointModel = get_object( + self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False) + if db_endpoint is None: + msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + endpoint_uuid = db_endpoint.endpoint_uuid + str_endpoint_key = db_endpoint.pk + + kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type + sample_type = kpi_sample_type.value + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + db_endpoint_monitor : EndPointMonitorModel = get_object( + self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) + if db_endpoint_monitor is None: + msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') + db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( + self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) + if db_endpoint_monitor_kpi is None: + msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval + resources_to_unsubscribe.append( + (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) + + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) + errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate) + if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) + + db_endpoint_monitor_kpi.delete() + db_kpi.delete() + + # There is one monitoring loop per device; keep them active since they are re-used by different monitoring + # requests. + #self.monitoring_loops.remove(device_uuid) + + # Subscriptions are not stored as classical driver config. + # TODO: consider adding it somehow in the configuration. + # Warning: GetConfig might be very slow in OpenConfig devices + #running_config_rules = [ + # (config_rule[0], json.dumps(config_rule[1], sort_keys=True)) + # for config_rule in driver.GetConfig() + #] + #context_config_rules = { + # config_rule[1]: config_rule[2] + # for config_rule in get_config_rules(self.database, device_uuid, 'running') + #} + + ## each in context, not in running => delete in context + ## each in running, not in context => add to context + ## each in context and in running, context.value != running.value => update in context + #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = [] + #for config_rule_key,config_rule_value in running_config_rules: + # running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value)) + # context_config_rules.pop(config_rule_key, None) + #for context_rule_key,context_rule_value in context_config_rules.items(): + # running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value)) + + ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}' + ##for i,running_config_rules_action in enumerate(running_config_rules_actions): + ## LOGGER.info(msg.format(i, str(running_config_rules_action))) + #update_config(self.database, device_uuid, 'running', running_config_rules_actions) + + sync_device_to_context(db_device, self.context_client) + return Empty() + finally: + self.mutex_queues.signal_done(device_uuid)