Skip to content
Snippets Groups Projects
Commit 42259aa4 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Initial version of Device after re-work:

- Subscriptions to be implemented
parent 78dd010b
No related branches found
No related tags found
1 merge request!54Release 2.0.0
......@@ -34,3 +34,11 @@ class InvalidArgumentException(ServiceException):
details = '{:s}({:s}) is invalid'.format(str(argument_name), str(argument_value))
super().__init__(grpc.StatusCode.INVALID_ARGUMENT, details, extra_details=extra_details)
class OperationFailedException(ServiceException):
def __init__(
self, operation : str, extra_details : Union[str, Iterable[str]] = None
) -> None:
details = 'Operation({:s}) failed'.format(str(operation))
super().__init__(grpc.StatusCode.INTERNAL, details, extra_details=extra_details)
from typing import Any, List, Tuple
import grpc, logging
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object
from common.orm.backend.BackendEnum import BackendEnum
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.driver_api.Tools import check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors
from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
......@@ -71,11 +73,11 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
#config_name = 'running'
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
......@@ -83,26 +85,47 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
#resources_to_set = List[Tuple[str: resource_key, any: resource_value]]
#resources_to_delete = List[Tuple[str: resource_key]]
#resources_to_subscribe = List[str: resource_key, float: sampling_rate]
#resources_to_unsubscribe = List[Tuple[str: resource_key]]
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[str] = [] # key
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
# TODO: Compute "difference" between config field in request and config from Context
# TODO: Compute list of changes between device_config in context, and device_config in request
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
if (key in context_config_rules):
resources_to_delete.append(key)
driver_filter_fields = get_device_driver_filter_fields(db_device)
driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields)
driver.Connect()
# TODO: Implement configuration of subscriptions
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
#results_setconfig = driver.SetConfig(resources_to_set)
## check result
#results_deleteconfig = driver.DeleteConfig(resources_to_delete)
## check result
#results_subscribestate = driver.SubscribeState(resources_to_subscribe)
## check result
#results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
## check result
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) == 0:
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))
if len(errors) == 0:
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
......@@ -115,10 +138,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
driver_filter_fields = get_device_driver_filter_fields(db_device)
driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields)
driver.Disconnect()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_endpoint_pk,_ in db_device.references(EndPointModel):
......
......@@ -16,7 +16,7 @@ class DriverInstanceCache:
self._driver_factory = driver_factory
def get(
self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any], address : Optional[str] = None,
self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any] = {}, address : Optional[str] = None,
port : Optional[int] = None, settings : Dict[str, Any] = {}) -> _Driver:
if self._terminate.is_set():
......@@ -28,12 +28,19 @@ class DriverInstanceCache:
driver_instance = self._device_uuid__to__driver_instance.get(device_uuid)
if driver_instance is not None: return driver_instance
if len(filter_fields) == 0: return None
driver_class = self._driver_factory.get_driver_class(**filter_fields)
driver_instance : _Driver = driver_class(address, port, **settings)
self._device_uuid__to__driver_instance[device_uuid] = driver_instance
return driver_instance
def terminate(self):
def delete(self, device_uuid : str) -> None:
with self._lock:
device_driver = self._device_uuid__to__driver_instance.pop(device_uuid, None)
if device_driver is None: return
device_driver.Disconnect()
def terminate(self) -> None:
self._terminate.set()
with self._lock:
while len(self._device_uuid__to__driver_instance) > 0:
......
import operator
from typing import Any, Callable, List, Tuple, Union
ACTION_MSG_GET = 'Get resource(key={:s})'
ACTION_MSG_SET = 'Set resource(key={:s}, value={:s})'
ACTION_MSG_DELETE = 'Delete resource(key={:s})'
ACTION_MSG_SUBSCRIBE = 'Subscribe subscription(key={:s}, duration={:s}, interval={:s})'
ACTION_MSG_UNSUBSCRIBE = 'Unsubscribe subscription(key={:s}, duration={:s}, interval={:s})'
def _get(resource_key : str): return ACTION_MSG_GET.format(str(resource_key))
def _set(resource : Tuple[str, Any]): return ACTION_MSG_SET.format(map(str, resource))
def _delete(resource_key : str): return ACTION_MSG_SET.format(str(resource_key))
def _subscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_SUBSCRIBE.format(map(str, subscription))
def _unsubscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_UNSUBSCRIBE.format(map(str, subscription))
def _check_errors(
error_func : Callable, parameters_list : List[Any], results_list : List[Union[bool, Exception]]
) -> List[str]:
errors = []
for parameters, results in zip(parameters_list, results_list):
if not isinstance(results, Exception): continue
errors.append('Unable to {:s}; error({:s})'.format(error_func(parameters), str(results)))
return errors
def check_get_errors(
resource_keys : List[str], results : List[Tuple[str, Union[Any, None, Exception]]]
) -> List[str]:
return _check_errors(_set, resource_keys, map(operator.itemgetter(1), results))
def check_set_errors(
resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]]
) -> List[str]:
return _check_errors(_set, resources, results)
def check_delete_errors(
resource_keys : List[str], results : List[Union[bool, Exception]]
) -> List[str]:
return _check_errors(_delete, resource_keys, results)
def check_subscribe_errors(
subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]]
) -> List[str]:
return _check_errors(_subscribe, subscriptions, results)
def check_unsubscribe_errors(
subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]]
) -> List[str]:
return _check_errors(_unsubscribe, subscriptions, results)
import copy, grpc, logging, os, pytest
import copy, grpc, logging, operator, os, pytest
from typing import Any, Dict, List, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
......@@ -16,10 +17,11 @@ from device.Config import (
from device.client.DeviceClient import DeviceClient
from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology
from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, TOPOLOGY, config_rule
from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
......@@ -84,8 +86,14 @@ def device_client(device_service : DeviceService): # pylint: disable=redefined-o
yield _client
_client.close()
def grpc_message_to_json_string(message):
return str(MessageToDict(
message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False))
def test_device_add(
context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
context_client.SetContext(Context(**CONTEXT))
context_client.SetTopology(Topology(**TOPOLOGY))
......@@ -103,15 +111,28 @@ def test_device_add(
DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1)
DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear()
device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID) # we know the driver exists now
assert driver is not None
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE1_ID))
LOGGER.info('initial_config = {:s}'.format(str(initial_config)))
LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID))
LOGGER.info('device_data = {:s}'.format(str(device_data)))
LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
driver_config = driver.GetConfig()
LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == 0
device_client.ConfigureDevice(Device(**DEVICE1))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == 3
assert driver_config[0] == ('/dev/rsrc1/value', 'value1')
assert driver_config[1] == ('/dev/rsrc2/value', 'value2')
assert driver_config[2] == ('/dev/rsrc3/value', 'value3')
DEVICE1_WITH = copy.deepcopy(DEVICE1)
CONFIG_RULES : List[Dict[str, Any]] = DEVICE1_WITH['device_config']['config_rules']
CONFIG_RULES.clear()
......@@ -121,10 +142,27 @@ def test_device_add(
CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12'))
device_client.ConfigureDevice(Device(**DEVICE1_WITH))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == 5
assert driver_config[0] == ('/dev/rsrc10/value', 'value10')
assert driver_config[1] == ('/dev/rsrc11/value', 'value11')
assert driver_config[2] == ('/dev/rsrc12/value', 'value12')
assert driver_config[3] == ('/dev/rsrc2/value', 'value2')
assert driver_config[4] == ('/dev/rsrc3/value', 'value3')
device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID))
LOGGER.info('device_data = {:s}'.format(str(device_data)))
#LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
'\n'.join([
'{:s} {:s} = {:s}'.format(
ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
for config_rule in device_data.device_config.config_rules
])))
device_client.DeleteDevice(DeviceId(**DEVICE1_ID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID, {}) # we know the driver exists now
assert driver is None
raise Exception()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment