diff --git a/src/common/rpc_method_wrapper/ServiceExceptions.py b/src/common/rpc_method_wrapper/ServiceExceptions.py index 960eb57f319b52619ad70c702a501140aecbaca0..fb3979c6c216d9361ced706b4fabd5ba5bbcffb2 100644 --- a/src/common/rpc_method_wrapper/ServiceExceptions.py +++ b/src/common/rpc_method_wrapper/ServiceExceptions.py @@ -34,3 +34,11 @@ class InvalidArgumentException(ServiceException): details = '{:s}({:s}) is invalid'.format(str(argument_name), str(argument_value)) super().__init__(grpc.StatusCode.INVALID_ARGUMENT, details, extra_details=extra_details) + +class OperationFailedException(ServiceException): + def __init__( + self, operation : str, extra_details : Union[str, Iterable[str]] = None + ) -> None: + + details = 'Operation({:s}) failed'.format(str(operation)) + super().__init__(grpc.StatusCode.INTERNAL, details, extra_details=extra_details) diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 041a5ae43949f8b513da74b94538c8f2f1f1ef26..c20f252ac1639176bf19ff076446d26dc2aae8ef 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -1,13 +1,15 @@ +from typing import Any, List, Tuple import grpc, logging from common.orm.Database import Database from common.orm.Factory import get_database_backend from common.orm.HighLevel import get_object from common.orm.backend.BackendEnum import BackendEnum from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException +from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException from context.client.ContextClient import ContextClient from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from device.proto.device_pb2_grpc import DeviceServiceServicer +from device.service.driver_api.Tools import check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config from .database.DeviceModel import DeviceModel, DriverModel from .database.EndPointModel import EndPointModel @@ -71,11 +73,11 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid - #config_name = 'running' sync_device_from_context(device_uuid, self.context_client, self.database) context_config_rules = get_config_rules(self.database, device_uuid, 'running') + context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) db_device,_ = update_device_in_local_database(self.database, request) @@ -83,26 +85,47 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) - #resources_to_set = List[Tuple[str: resource_key, any: resource_value]] - #resources_to_delete = List[Tuple[str: resource_key]] - #resources_to_subscribe = List[str: resource_key, float: sampling_rate] - #resources_to_unsubscribe = List[Tuple[str: resource_key]] + resources_to_set : List[Tuple[str, Any]] = [] # key, value + resources_to_delete : List[str] = [] # key + resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval + resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval - # TODO: Compute "difference" between config field in request and config from Context - # TODO: Compute list of changes between device_config in context, and device_config in request + for config_rule in request_config_rules: + action, key, value = config_rule + if action == ORM_ConfigActionEnum.SET: + if (key not in context_config_rules) or (context_config_rules[key] != value): + resources_to_set.append((key, value)) + elif action == ORM_ConfigActionEnum.DELETE: + if (key in context_config_rules): + resources_to_delete.append(key) - driver_filter_fields = get_device_driver_filter_fields(db_device) - driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) - driver.Connect() + # TODO: Implement configuration of subscriptions + # TODO: use of datastores (might be virtual ones) to enable rollbacks + + errors = [] + + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) + + if len(errors) == 0: + results_setconfig = driver.SetConfig(resources_to_set) + errors.extend(check_set_errors(resources_to_set, results_setconfig)) - #results_setconfig = driver.SetConfig(resources_to_set) - ## check result - #results_deleteconfig = driver.DeleteConfig(resources_to_delete) - ## check result - #results_subscribestate = driver.SubscribeState(resources_to_subscribe) - ## check result - #results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) - ## check result + if len(errors) == 0: + results_deleteconfig = driver.DeleteConfig(resources_to_delete) + errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) + + if len(errors) == 0: + results_subscribestate = driver.SubscribeState(resources_to_subscribe) + errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate)) + + if len(errors) == 0: + results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) + errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate)) + + if len(errors) > 0: + raise OperationFailedException('ConfigureDevice', extra_details=errors) sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @@ -115,10 +138,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) if db_device is None: return Empty() - driver_filter_fields = get_device_driver_filter_fields(db_device) - driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) - driver.Disconnect() - + self.driver_instance_cache.delete(device_uuid) delete_device_from_context(db_device, self.context_client) for db_endpoint_pk,_ in db_device.references(EndPointModel): diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py index 3232c0fa26481d47514d09814879d65f4bc6c1d6..603a6429b1fa474f5051be4fa2d258428e59562c 100644 --- a/src/device/service/driver_api/DriverInstanceCache.py +++ b/src/device/service/driver_api/DriverInstanceCache.py @@ -16,7 +16,7 @@ class DriverInstanceCache: self._driver_factory = driver_factory def get( - self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any], address : Optional[str] = None, + self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any] = {}, address : Optional[str] = None, port : Optional[int] = None, settings : Dict[str, Any] = {}) -> _Driver: if self._terminate.is_set(): @@ -28,12 +28,19 @@ class DriverInstanceCache: driver_instance = self._device_uuid__to__driver_instance.get(device_uuid) if driver_instance is not None: return driver_instance + if len(filter_fields) == 0: return None driver_class = self._driver_factory.get_driver_class(**filter_fields) driver_instance : _Driver = driver_class(address, port, **settings) self._device_uuid__to__driver_instance[device_uuid] = driver_instance return driver_instance - def terminate(self): + def delete(self, device_uuid : str) -> None: + with self._lock: + device_driver = self._device_uuid__to__driver_instance.pop(device_uuid, None) + if device_driver is None: return + device_driver.Disconnect() + + def terminate(self) -> None: self._terminate.set() with self._lock: while len(self._device_uuid__to__driver_instance) > 0: diff --git a/src/device/service/driver_api/Tools.py b/src/device/service/driver_api/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..6e70c38a24419266e05b5f6ea8601b897d5ac53e --- /dev/null +++ b/src/device/service/driver_api/Tools.py @@ -0,0 +1,48 @@ +import operator +from typing import Any, Callable, List, Tuple, Union + +ACTION_MSG_GET = 'Get resource(key={:s})' +ACTION_MSG_SET = 'Set resource(key={:s}, value={:s})' +ACTION_MSG_DELETE = 'Delete resource(key={:s})' +ACTION_MSG_SUBSCRIBE = 'Subscribe subscription(key={:s}, duration={:s}, interval={:s})' +ACTION_MSG_UNSUBSCRIBE = 'Unsubscribe subscription(key={:s}, duration={:s}, interval={:s})' + +def _get(resource_key : str): return ACTION_MSG_GET.format(str(resource_key)) +def _set(resource : Tuple[str, Any]): return ACTION_MSG_SET.format(map(str, resource)) +def _delete(resource_key : str): return ACTION_MSG_SET.format(str(resource_key)) +def _subscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_SUBSCRIBE.format(map(str, subscription)) +def _unsubscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_UNSUBSCRIBE.format(map(str, subscription)) + +def _check_errors( + error_func : Callable, parameters_list : List[Any], results_list : List[Union[bool, Exception]] + ) -> List[str]: + errors = [] + for parameters, results in zip(parameters_list, results_list): + if not isinstance(results, Exception): continue + errors.append('Unable to {:s}; error({:s})'.format(error_func(parameters), str(results))) + return errors + +def check_get_errors( + resource_keys : List[str], results : List[Tuple[str, Union[Any, None, Exception]]] + ) -> List[str]: + return _check_errors(_set, resource_keys, map(operator.itemgetter(1), results)) + +def check_set_errors( + resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]] + ) -> List[str]: + return _check_errors(_set, resources, results) + +def check_delete_errors( + resource_keys : List[str], results : List[Union[bool, Exception]] + ) -> List[str]: + return _check_errors(_delete, resource_keys, results) + +def check_subscribe_errors( + subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] + ) -> List[str]: + return _check_errors(_subscribe, subscriptions, results) + +def check_unsubscribe_errors( + subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] + ) -> List[str]: + return _check_errors(_unsubscribe, subscriptions, results) diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py index f2ff57193db6942f9572ce76fe04d2e4f26cb10b..37848f4ada1db3e8de950004ef8bb3224461bce2 100644 --- a/src/device/tests/test_unitary.py +++ b/src/device/tests/test_unitary.py @@ -1,5 +1,6 @@ -import copy, grpc, logging, os, pytest +import copy, grpc, logging, operator, os, pytest from typing import Any, Dict, List, Tuple +from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum @@ -16,10 +17,11 @@ from device.Config import ( from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.service.DeviceService import DeviceService +from device.service.driver_api._Driver import _Driver from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS -from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, TOPOLOGY, config_rule +from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -84,8 +86,14 @@ def device_client(device_service : DeviceService): # pylint: disable=redefined-o yield _client _client.close() +def grpc_message_to_json_string(message): + return str(MessageToDict( + message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) + def test_device_add( - context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) @@ -103,15 +111,28 @@ def test_device_add( DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1) DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear() device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES)) + driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID) # we know the driver exists now + assert driver is not None initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE1_ID)) - LOGGER.info('initial_config = {:s}'.format(str(initial_config))) + LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID)) - LOGGER.info('device_data = {:s}'.format(str(device_data))) + LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + + driver_config = driver.GetConfig() + LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == 0 device_client.ConfigureDevice(Device(**DEVICE1)) + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == 3 + assert driver_config[0] == ('/dev/rsrc1/value', 'value1') + assert driver_config[1] == ('/dev/rsrc2/value', 'value2') + assert driver_config[2] == ('/dev/rsrc3/value', 'value3') + DEVICE1_WITH = copy.deepcopy(DEVICE1) CONFIG_RULES : List[Dict[str, Any]] = DEVICE1_WITH['device_config']['config_rules'] CONFIG_RULES.clear() @@ -121,10 +142,27 @@ def test_device_add( CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12')) device_client.ConfigureDevice(Device(**DEVICE1_WITH)) + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == 5 + assert driver_config[0] == ('/dev/rsrc10/value', 'value10') + assert driver_config[1] == ('/dev/rsrc11/value', 'value11') + assert driver_config[2] == ('/dev/rsrc12/value', 'value12') + assert driver_config[3] == ('/dev/rsrc2/value', 'value2') + assert driver_config[4] == ('/dev/rsrc3/value', 'value3') + device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID)) - LOGGER.info('device_data = {:s}'.format(str(device_data))) + #LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join([ + '{:s} {:s} = {:s}'.format( + ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ]))) device_client.DeleteDevice(DeviceId(**DEVICE1_ID)) + driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID, {}) # we know the driver exists now + assert driver is None raise Exception()