Skip to content
Snippets Groups Projects
Commit 10a63cc4 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common and Device:

- Implemented generic MutexQueues class
- Implemented sequentialization of operations in Device component to prevent data corruption and race conditions
parent 8806bbf9
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!4Compute component:
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# MutexQueues:
# ------------
# This class enables to schedule and serialize operations concurrently issued
# over a number of resources. For instance, when multiple components want to
# configure devices through the Device component, configuration operations
# have to be serialized to prevent data corruptions, and race conditions, etc.
# Usage Example:
# class Servicer():
# def __init__(self):
# # init other stuff
# self.drivers = dict()
# self.mutex_queues = MutexQueues()
#
# def configure_device(self, device_uuid, settings):
# self.mutex_queues.wait_my_turn(device_uuid)
# driver = self.drivers.get(device_uuid)
# if driver is None:
# driver = Driver(device_uuid)
# self.drivers[device_uuid] = driver
# driver.configure(settings)
# self.mutex_queues.signal_done(device_uuid)
import threading
from queue import Queue
from typing import Dict
class MutexQueues:
def __init__(self) -> None:
# lock to protect dictionary updates
self.lock = threading.Lock()
# dictionaty of queues of mutexes: queue_name => queue[mutex]
# first mutex is the running one
self.mutex_queues : Dict[str, Queue[threading.Event]] = dict()
def wait_my_turn(self, queue_name : str) -> None:
# create my mutex and enqueue it
mutex = threading.Event()
with self.lock:
queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
first_in_queue = (queue.qsize() == 0)
queue.put_nowait(mutex)
# if I'm the first in the queue upon addition, means there are no running tasks
# directly return without waiting
if first_in_queue: return
# otherwise, wait for my turn in the queue
mutex.wait()
def signal_done(self, queue_name : str) -> None:
# I'm done with my work
with self.lock:
queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
# remove muself from the queue
queue.get_nowait()
# if there are no other tasks queued, return
if queue.qsize() == 0: return
# otherwise, signal the next task in the queue to start
next_mutex : threading.Event = queue.queue[0]
next_mutex.set()
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -23,10 +23,15 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops
# Custom gRPC settings
# Multiple clients might keep connections alive waiting for RPC methods to be executed.
# Requests needs to be serialized to ensure correct device configurations
GRPC_MAX_WORKERS = 200
class DeviceService(GenericGrpcService):
def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.DEVICE)
super().__init__(port, cls_name=cls_name)
super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
self.monitoring_loops = MonitoringLoops(database)
self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops)
......
......@@ -24,6 +24,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
from .database.ConfigModel import (
ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
......@@ -56,6 +57,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
self.database = database
self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops
self.mutex_queues = MutexQueues()
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
......@@ -101,348 +103,368 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
json_request['device_config'] = {}
request = Device(**json_request)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
#LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
address = connection_config_rules.pop('address', None)
port = connection_config_rules.pop('port', None)
settings = connection_config_rules.pop('settings', '{}')
self.mutex_queues.wait_my_turn(device_uuid)
try:
settings = json.loads(settings)
except ValueError as e:
raise InvalidArgumentException(
'device.device_config.config_rules[settings]', settings,
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
driver : _Driver = self.driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
try:
for resource_key, resource_value in endpoints:
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
#LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
address = connection_config_rules.pop('address', None)
port = connection_config_rules.pop('port', None)
settings = connection_config_rules.pop('settings', '{}')
try:
settings = json.loads(settings)
except ValueError as e:
raise InvalidArgumentException(
'device.device_config.config_rules[settings]', settings,
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
driver : _Driver = self.driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
try:
for resource_key, resource_value in endpoints:
if isinstance(resource_value, Exception):
LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
continue
endpoint_uuid = resource_value.get('uuid')
endpoint_type = resource_value.get('type')
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
db_endpoint, _ = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint_type,
'resource_key' : resource_key,
})
sample_types : Dict[int, str] = resource_value.get('sample_types', {})
for sample_type, monitor_resource_key in sample_types.items():
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
'endpoint_fk' : db_endpoint,
'resource_key' : monitor_resource_key,
'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
})
except: # pylint: disable=bare-except
LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
raw_running_config_rules = driver.GetConfig()
running_config_rules = []
for resource_key, resource_value in raw_running_config_rules:
if isinstance(resource_value, Exception):
LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
msg = 'Error retrieving config rules: {:s} => {:s}'
LOGGER.error(msg.format(str(resource_key), str(resource_value)))
continue
endpoint_uuid = resource_value.get('uuid')
endpoint_type = resource_value.get('type')
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
db_endpoint, _ = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint_type,
'resource_key' : resource_key,
})
sample_types : Dict[int, str] = resource_value.get('sample_types', {})
for sample_type, monitor_resource_key in sample_types.items():
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
'endpoint_fk' : db_endpoint,
'resource_key' : monitor_resource_key,
'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
})
except: # pylint: disable=bare-except
LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
raw_running_config_rules = driver.GetConfig()
running_config_rules = []
for resource_key, resource_value in raw_running_config_rules:
if isinstance(resource_value, Exception):
msg = 'Error retrieving config rules: {:s} => {:s}'
LOGGER.error(msg.format(str(resource_key), str(resource_value)))
continue
config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
running_config_rules.append(config_rule)
config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
running_config_rules.append(config_rule)
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
#LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
# include_config_rules=True, include_drivers=True, include_endpoints=True))))
#LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
# include_config_rules=True, include_drivers=True, include_endpoints=True))))
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
finally:
self.mutex_queues.signal_done(device_uuid)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
self.mutex_queues.wait_my_turn(device_uuid)
try:
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
#LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
#LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
db_device,_ = update_device_in_local_database(self.database, request)
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
#LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
#LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[Tuple[str, Any]] = [] # key, value
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[Tuple[str, Any]] = [] # key, value
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
if key in context_config_rules:
resources_to_delete.append((key, value))
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
if key in context_config_rules:
resources_to_delete.append((key, value))
#LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
#LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
#LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
#LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
# TODO: use of datastores (might be virtual ones) to enable rollbacks
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
running_config_rules = driver.GetConfig()
running_config_rules = [
(ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
for config_rule in running_config_rules if not isinstance(config_rule[1], Exception)
]
#for running_config_rule in running_config_rules:
# LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
running_config_rules = driver.GetConfig()
running_config_rules = [
(ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
for config_rule in running_config_rules if not isinstance(config_rule[1], Exception)
]
#for running_config_rule in running_config_rules:
# LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
finally:
self.mutex_queues.signal_done(device_uuid)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
self.monitoring_loops.remove(device_uuid)
self.mutex_queues.wait_my_turn(device_uuid)
try:
self.monitoring_loops.remove(device_uuid)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_kpi_pk,_ in db_device.references(KpiModel):
db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
db_kpi.delete()
for db_kpi_pk,_ in db_device.references(KpiModel):
db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
db_kpi.delete()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
db_endpoint = EndPointModel(self.database, db_endpoint_pk)
for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
db_endpoint.delete()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
db_endpoint = EndPointModel(self.database, db_endpoint_pk)
for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
db_endpoint.delete()
for db_driver_pk,_ in db_device.references(DriverModel):
get_object(self.database, DriverModel, db_driver_pk).delete()
for db_driver_pk,_ in db_device.references(DriverModel):
get_object(self.database, DriverModel, db_driver_pk).delete()
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
finally:
self.mutex_queues.signal_done(device_uuid)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
self.mutex_queues.wait_my_turn(device_uuid)
try:
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
config_rules = {} if db_device is None else db_device.dump_initial_config()
return DeviceConfig(config_rules=config_rules)
config_rules = {} if db_device is None else db_device.dump_initial_config()
device_config = DeviceConfig(config_rules=config_rules)
return device_config
finally:
self.mutex_queues.signal_done(device_uuid)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
self.mutex_queues.wait_my_turn(device_uuid)
try:
subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
if subscribe:
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
if db_endpoint is None:
msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
sample_type = request.kpi_descriptor.kpi_sample_type
attributes = {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
}
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
db_kpi, updated = result
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
str(device_uuid), str(endpoint_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
attributes = {
'endpoint_monitor_fk': db_endpoint_monitor,
'kpi_fk' : db_kpi,
}
result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
db_endpoint_monitor_kpi, updated = result
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_subscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
self.monitoring_loops.add(device_uuid, driver)
subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
if subscribe:
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
if db_endpoint is None:
msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
sample_type = request.kpi_descriptor.kpi_sample_type
attributes = {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
}
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
db_kpi, updated = result
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
str(device_uuid), str(endpoint_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
attributes = {
'endpoint_monitor_fk': db_endpoint_monitor,
'kpi_fk' : db_kpi,
}
result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
db_endpoint_monitor_kpi, updated = result
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_subscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
self.monitoring_loops.add(device_uuid, driver)
else:
db_kpi : KpiModel = get_object(
self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
if db_kpi is None:
msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
db_device : DeviceModel = get_object(
self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
device_uuid = db_device.device_uuid
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
if db_endpoint is None:
msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_uuid = db_endpoint.endpoint_uuid
str_endpoint_key = db_endpoint.pk
kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
sample_type = kpi_sample_type.value
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
if db_endpoint_monitor_kpi is None:
msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
db_endpoint_monitor_kpi.delete()
db_kpi.delete()
# There is one monitoring loop per device; keep them active since they are re-used by different monitoring
# requests.
#self.monitoring_loops.remove(device_uuid)
# Subscriptions are not stored as classical driver config.
# TODO: consider adding it somehow in the configuration.
# Warning: GetConfig might be very slow in OpenConfig devices
#running_config_rules = [
# (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
# for config_rule in driver.GetConfig()
#]
#context_config_rules = {
# config_rule[1]: config_rule[2]
# for config_rule in get_config_rules(self.database, device_uuid, 'running')
#}
## each in context, not in running => delete in context
## each in running, not in context => add to context
## each in context and in running, context.value != running.value => update in context
#running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
#for config_rule_key,config_rule_value in running_config_rules:
# running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
# context_config_rules.pop(config_rule_key, None)
#for context_rule_key,context_rule_value in context_config_rules.items():
# running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))
##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
##for i,running_config_rules_action in enumerate(running_config_rules_actions):
## LOGGER.info(msg.format(i, str(running_config_rules_action)))
#update_config(self.database, device_uuid, 'running', running_config_rules_actions)
sync_device_to_context(db_device, self.context_client)
return Empty()
else:
db_kpi : KpiModel = get_object(
self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
if db_kpi is None:
msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
db_device : DeviceModel = get_object(
self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
device_uuid = db_device.device_uuid
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
if db_endpoint is None:
msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_uuid = db_endpoint.endpoint_uuid
str_endpoint_key = db_endpoint.pk
kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
sample_type = kpi_sample_type.value
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
if db_endpoint_monitor_kpi is None:
msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
db_endpoint_monitor_kpi.delete()
db_kpi.delete()
# There is one monitoring loop per device; keep them active since they are re-used by different monitoring
# requests.
#self.monitoring_loops.remove(device_uuid)
# Subscriptions are not stored as classical driver config.
# TODO: consider adding it somehow in the configuration.
# Warning: GetConfig might be very slow in OpenConfig devices
#running_config_rules = [
# (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
# for config_rule in driver.GetConfig()
#]
#context_config_rules = {
# config_rule[1]: config_rule[2]
# for config_rule in get_config_rules(self.database, device_uuid, 'running')
#}
## each in context, not in running => delete in context
## each in running, not in context => add to context
## each in context and in running, context.value != running.value => update in context
#running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
#for config_rule_key,config_rule_value in running_config_rules:
# running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
# context_config_rules.pop(config_rule_key, None)
#for context_rule_key,context_rule_value in context_config_rules.items():
# running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))
##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
##for i,running_config_rules_action in enumerate(running_config_rules_actions):
## LOGGER.info(msg.format(i, str(running_config_rules_action)))
#update_config(self.database, device_uuid, 'running', running_config_rules_actions)
sync_device_to_context(db_device, self.context_client)
return Empty()
finally:
self.mutex_queues.signal_done(device_uuid)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment