diff --git a/report_coverage_common.sh b/report_coverage_common.sh index 500b3dfb130e6e0614a0beb649f0d93bf7d0ffee..be7224ad610ddf2c266fa485c6535f165cdc72ca 100755 --- a/report_coverage_common.sh +++ b/report_coverage_common.sh @@ -1,3 +1,3 @@ #!/bin/bash -./report_coverage_all.sh | grep -v -E "^(cent|comp|cont|devi|moni|serv|test)" | grep --color -E -i "^common/.*$|$" +./report_coverage_all.sh | grep --color -E -i "^common/.*$|$" diff --git a/report_coverage_context.sh b/report_coverage_context.sh index 95966ead0bdf84b39be3e3f3063e1b93dfad32f1..3a404a62698cdd95f94c9ed7d4c8b4b073778d08 100755 --- a/report_coverage_context.sh +++ b/report_coverage_context.sh @@ -1,3 +1,3 @@ #!/bin/bash -./report_coverage_all.sh | grep -v -E "^(cent|com|devi|moni|serv|test)" | grep --color -E -i "^context/.*$|$" +./report_coverage_all.sh | grep --color -E -i "^context/.*$|$" diff --git a/report_coverage_device.sh b/report_coverage_device.sh index be2612d89ce56d518d992327f93a24853e591a4d..ed7731c86343ba4d72edf3c4bada612ddbc4268d 100755 --- a/report_coverage_device.sh +++ b/report_coverage_device.sh @@ -1,3 +1,3 @@ #!/bin/bash -./report_coverage_all.sh | grep --color -E -i "^device/.*$|$" +./report_coverage_all.sh | grep -v -E "^(cent|comm|comp|cont|moni|serv|test)" | grep --color -E -i "^device/.*$|$" diff --git a/run_tests_locally.sh b/run_tests_locally.sh index a6c6b6c43d67ea56a091729d7ba4aec8e8ce2ade..54b8243fedc3a8e1372ddfac961b71aba74d925e 100755 --- a/run_tests_locally.sh +++ b/run_tests_locally.sh @@ -18,23 +18,23 @@ export REDIS_SERVICE_PORT=$(kubectl get service contextservice-public --namespac # First destroy old coverage file rm -f $COVERAGEFILE -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - common/orm/tests/test_unitary.py \ - common/message_broker/tests/test_unitary.py \ - common/rpc_method_wrapper/tests/test_unitary.py +#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# common/orm/tests/test_unitary.py \ +# common/message_broker/tests/test_unitary.py \ +# common/rpc_method_wrapper/tests/test_unitary.py -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - centralizedattackdetector/tests/test_unitary.py +#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# centralizedattackdetector/tests/test_unitary.py -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - context/tests/test_unitary.py +#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# context/tests/test_unitary.py coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - device/tests/test_unitary_driverapi.py \ device/tests/test_unitary.py + #device/tests/test_unitary_driverapi.py \ #coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # service/tests/test_unitary.py -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - compute/tests/test_unitary.py +#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# compute/tests/test_unitary.py diff --git a/src/common/rpc_method_wrapper/ServiceExceptions.py b/src/common/rpc_method_wrapper/ServiceExceptions.py index 67a1e8003469ed96845aee74b17e48dfab58d88b..960eb57f319b52619ad70c702a501140aecbaca0 100644 --- a/src/common/rpc_method_wrapper/ServiceExceptions.py +++ b/src/common/rpc_method_wrapper/ServiceExceptions.py @@ -1,23 +1,36 @@ import grpc -from typing import Iterable +from typing import Iterable, Union class ServiceException(Exception): - def __init__(self, code : grpc.StatusCode, details : str, extra_details : Iterable[str] = []) -> None: + def __init__( + self, code : grpc.StatusCode, details : str, extra_details : Union[str, Iterable[str]] = [] + ) -> None: + self.code = code + if isinstance(extra_details, str): extra_details = [extra_details] self.details = '; '.join(map(str, [details] + extra_details)) super().__init__(self.details) class NotFoundException(ServiceException): - def __init__(self, object_name : str, object_uuid: str, extra_details : Iterable[str] = []) -> None: + def __init__( + self, object_name : str, object_uuid: str, extra_details : Union[str, Iterable[str]] = [] + ) -> None: + details = '{:s}({:s}) not found'.format(str(object_name), str(object_uuid)) super().__init__(grpc.StatusCode.NOT_FOUND, details, extra_details=extra_details) class AlreadyExistsException(ServiceException): - def __init__(self, object_name : str, object_uuid: str, extra_details : Iterable[str] = None) -> None: + def __init__( + self, object_name : str, object_uuid: str, extra_details : Union[str, Iterable[str]] = None + ) -> None: + details = '{:s}({:s}) already exists'.format(str(object_name), str(object_uuid)) super().__init__(grpc.StatusCode.ALREADY_EXISTS, details, extra_details=extra_details) class InvalidArgumentException(ServiceException): - def __init__(self, argument_name : str, argument_value: str, extra_details : Iterable[str] = None) -> None: + def __init__( + self, argument_name : str, argument_value: str, extra_details : Union[str, Iterable[str]] = None + ) -> None: + details = '{:s}({:s}) is invalid'.format(str(argument_name), str(argument_value)) super().__init__(grpc.StatusCode.INVALID_ARGUMENT, details, extra_details=extra_details) diff --git a/src/context/service/database/ConfigModel.py b/src/context/service/database/ConfigModel.py index 774ea3743b9742dbb8c1a39b924dc2f173a38a76..d97cdb7dfe6594a59be10427ed52341346c19a97 100644 --- a/src/context/service/database/ConfigModel.py +++ b/src/context/service/database/ConfigModel.py @@ -50,53 +50,62 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method return result def set_config_rule( - database : Database, db_config : ConfigModel, grpc_config_rule, position : int + database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str ) -> Tuple[ConfigRuleModel, bool]: - str_rule_key_hash = fast_hasher(grpc_config_rule.resource_key) + str_rule_key_hash = fast_hasher(resource_key) str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, { - 'config_fk': db_config, - 'position' : position, - 'action' : grpc_to_enum__config_action(grpc_config_rule.action), - 'key' : grpc_config_rule.resource_key, - 'value' : grpc_config_rule.resource_value, - }) + 'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, + 'key': resource_key, 'value': resource_value}) db_config_rule, updated = result return db_config_rule, updated def delete_config_rule( - database : Database, db_config : ConfigModel, grpc_config_rule - ) -> Tuple[ConfigRuleModel, bool]: + database : Database, db_config : ConfigModel, resource_key : str + ) -> None: - str_rule_key_hash = fast_hasher(grpc_config_rule.resource_key) + str_rule_key_hash = fast_hasher(resource_key) str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - db_config_rule : Optional[ConfigRuleModel] = get_object( database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) + if db_config_rule is None: return db_config_rule.delete() -def set_config( - database : Database, db_parent_pk : str, config_name : str, grpc_config_rules +def delete_all_config_rules( + database : Database, db_config : ConfigModel + ) -> None: + + db_config_rule_pks = db_config.references(ConfigRuleModel) + for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete() + +def grpc_config_rules_to_raw(grpc_config_rules) -> List[Tuple[ORM_ConfigActionEnum, str, str]]: + def translate(grpc_config_rule): + action = grpc_to_enum__config_action(grpc_config_rule.action) + return action, grpc_config_rule.resource_key, grpc_config_rule.resource_value + return [translate(grpc_config_rule) for grpc_config_rule in grpc_config_rules] + +def update_config( + database : Database, db_parent_pk : str, config_name : str, + raw_config_rules : List[Tuple[ORM_ConfigActionEnum, str, str]] ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: str_config_key = key_to_str([db_parent_pk, config_name], separator=':') result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key) db_config, created = result - db_objects = [(db_config, created)] + db_objects : List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] - for position,grpc_config_rule in enumerate(grpc_config_rules): - action = grpc_to_enum__config_action(grpc_config_rule.action) + for position,(action, resource_key, resource_value) in enumerate(raw_config_rules): if action == ORM_ConfigActionEnum.SET: - result : Tuple[ConfigRuleModel, bool] = set_config_rule(database, db_config, grpc_config_rule, position) + result : Tuple[ConfigRuleModel, bool] = set_config_rule( + database, db_config, position, resource_key, resource_value) db_config_rule, updated = result db_objects.append((db_config_rule, updated)) elif action == ORM_ConfigActionEnum.DELETE: - delete_config_rule(database, db_config, grpc_config_rule) + delete_config_rule(database, db_config, resource_key) else: - msg = 'Unsupported action({:s})' - raise AttributeError(msg.format(str(ConfigActionEnum.Name(grpc_config_rule.action)))) + msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})' + raise AttributeError(msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value))) return db_objects diff --git a/src/context/service/grpc_server/ContextServiceServicerImpl.py b/src/context/service/grpc_server/ContextServiceServicerImpl.py index d8f7b648b4b919cc61330f236195c444f550ede1..e76c399cd4e17578a01ac7bf88cb0fc3f7017b8e 100644 --- a/src/context/service/grpc_server/ContextServiceServicerImpl.py +++ b/src/context/service/grpc_server/ContextServiceServicerImpl.py @@ -12,7 +12,7 @@ from context.proto.context_pb2 import ( DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from context.proto.context_pb2_grpc import ContextServiceServicer -from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, set_config +from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, grpc_config_rules_to_raw, update_config from context.service.database.ConstraintModel import ConstraintModel, ConstraintsModel, set_constraints from context.service.database.ContextModel import ContextModel from context.service.database.DeviceModel import ( @@ -236,7 +236,8 @@ class ContextServiceServicerImpl(ContextServiceServicer): 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) - running_config_result = set_config(self.database, device_uuid, 'running', request.device_config.config_rules) + config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) + running_config_result = update_config(self.database, device_uuid, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, { @@ -452,8 +453,8 @@ class ContextServiceServicerImpl(ContextServiceServicer): self.database, str_service_key, 'constraints', request.service_constraints) db_constraints = constraints_result[0][0] - running_config_result = set_config( - self.database, str_service_key, 'running', request.service_config.config_rules) + config_rules = grpc_config_rules_to_raw(request.service_config.config_rules) + running_config_result = update_config(self.database, str_service_key, 'running', config_rules) db_running_config = running_config_result[0][0] result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, { diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index 7773d1ada6825fa60ee9eb995dd54673b641c740..0388f3cae045dd9c8ce62fc389372a916822c04e 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -1,13 +1,12 @@ -import grpc -import logging +import grpc, logging from concurrent import futures from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from context.client.ContextClient import ContextClient from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server from device.service.DeviceServiceServicerImpl import DeviceServiceServicerImpl -from device.service.data_cache.DataCache import DataCache from device.service.driver_api.DriverInstanceCache import DriverInstanceCache BIND_ADDRESS = '0.0.0.0' @@ -15,10 +14,10 @@ LOGGER = logging.getLogger(__name__) class DeviceService: def __init__( - self, data_cache : DataCache, driver_instance_cache : DriverInstanceCache, + self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): - self.data_cache = data_cache + self.context_client = context_client self.driver_instance_cache = driver_instance_cache self.address = address self.port = port @@ -38,7 +37,7 @@ class DeviceService: self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - self.device_servicer = DeviceServiceServicerImpl(self.data_cache, self.driver_instance_cache) + self.device_servicer = DeviceServiceServicerImpl(self.context_client, self.driver_instance_cache) add_DeviceServiceServicer_to_server(self.device_servicer, self.server) self.health_servicer = HealthServicer( diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 4ab96681413ef3333dc11bf24c413dba7aca5adc..041a5ae43949f8b513da74b94538c8f2f1f1ef26 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -1,10 +1,19 @@ import grpc, logging +from common.orm.Database import Database +from common.orm.Factory import get_database_backend +from common.orm.HighLevel import get_object +from common.orm.backend.BackendEnum import BackendEnum from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException +from context.client.ContextClient import ContextClient from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from device.proto.device_pb2_grpc import DeviceServiceServicer -from .data_cache.DataCache import DataCache -from .data_cache.database.DeviceModel import DriverModel -from .data_cache.database.EndPointModel import EndPointModel +from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config +from .database.DeviceModel import DeviceModel, DriverModel +from .database.EndPointModel import EndPointModel +from .database.DatabaseTools import ( + delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context, + update_device_in_local_database) from .driver_api._Driver import _Driver from .driver_api.DriverInstanceCache import DriverInstanceCache @@ -15,9 +24,10 @@ METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfi METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DeviceServiceServicerImpl(DeviceServiceServicer): - def __init__(self, data_cache : DataCache, driver_instance_cache : DriverInstanceCache): + def __init__(self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache): LOGGER.debug('Creating Servicer...') - self.data_cache = data_cache + self.context_client = context_client + self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) self.driver_instance_cache = driver_instance_cache LOGGER.debug('Servicer Created') @@ -26,21 +36,35 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): device_id = request.device_id device_uuid = device_id.device_uuid.uuid - self.data_cache.sync_device_from_context(device_uuid) - db_device,_ = self.data_cache.set_device(request) + if len(request.device_config.config_rules) > 0: + raise InvalidArgumentException( + 'device.device_config.config_rules', str(request.device_config.config_rules), + extra_details='RPC method AddDevice does not allow definition of Config Rules. '\ + 'Add the Device first, and then configure it.') - driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device,_ = update_device_in_local_database(self.database, request) + + driver_filter_fields = get_device_driver_filter_fields(db_device) driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) driver.Connect() running_config_rules = driver.GetConfig() - self.data_cache.update_device_config_in_local_database(device_uuid, 'running', running_config_rules) + running_config_rules = [(ORM_ConfigActionEnum.SET, rule[0], rule[1]) for rule in running_config_rules] + LOGGER.info('[AddDevice] running_config_rules = {:s}'.format(str(running_config_rules))) - initial_config_rules = driver.GetInitialConfig() - self.data_cache.update_device_config_in_local_database(device_uuid, 'initial', initial_config_rules) + context_config_rules = get_config_rules(self.database, device_uuid, 'running') + LOGGER.info('[AddDevice] context_config_rules = {:s}'.format(str(context_config_rules))) - self.data_cache.sync_device_to_context(device_uuid) + # TODO: Compute diff between current context config and device config. The one in device is of higher priority + # (might happen another instance is updating config and context was not already updated) + update_config(self.database, device_uuid, 'running', running_config_rules) + + initial_config_rules = driver.GetInitialConfig() + update_config(self.database, device_uuid, 'initial', initial_config_rules) + + sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) @@ -49,18 +73,25 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): device_uuid = device_id.device_uuid.uuid #config_name = 'running' - self.data_cache.sync_device_from_context(device_uuid) - db_device,_ = self.data_cache.set_device(request) + sync_device_from_context(device_uuid, self.context_client, self.database) + + context_config_rules = get_config_rules(self.database, device_uuid, 'running') + LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) + + db_device,_ = update_device_in_local_database(self.database, request) + + request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) + LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) #resources_to_set = List[Tuple[str: resource_key, any: resource_value]] #resources_to_delete = List[Tuple[str: resource_key]] #resources_to_subscribe = List[str: resource_key, float: sampling_rate] #resources_to_unsubscribe = List[Tuple[str: resource_key]] - # Compute "difference" between config field in request and config from Context - # Compute list of changes between device_config in context, and device_config in request + # TODO: Compute "difference" between config field in request and config from Context + # TODO: Compute list of changes between device_config in context, and device_config in request - driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + driver_filter_fields = get_device_driver_filter_fields(db_device) driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) driver.Connect() @@ -73,39 +104,48 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): #results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) ## check result - self.data_cache.sync_device_to_context(device_uuid) + sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid - self.data_cache.sync_device_from_context(device_uuid) - db_device = self.data_cache.get_device(device_uuid) - driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + if db_device is None: return Empty() + + driver_filter_fields = get_device_driver_filter_fields(db_device) driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) driver.Disconnect() - self.data_cache.delete_device_from_context(device_uuid) + delete_device_from_context(db_device, self.context_client) for db_endpoint_pk,_ in db_device.references(EndPointModel): - EndPointModel(db_device.database, db_endpoint_pk).delete() + EndPointModel(self.database, db_endpoint_pk).delete() for db_driver_pk,_ in db_device.references(DriverModel): - DriverModel(db_device.database, db_driver_pk).delete() + DriverModel(self.database, db_driver_pk).delete() - #db_config = ConfigModel(db_device.database, db_device.device_config_fk) - #for db_config_rule_pk,_ in db_config.references(ConfigRuleModel): - # ConfigRuleModel(db_device.database, db_config_rule_pk).delete() + db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) + for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): + ConfigRuleModel(self.database, db_config_rule_pk).delete() - db_device.delete() - #db_config.delete() + db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) + for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): + ConfigRuleModel(self.database, db_config_rule_pk).delete() + db_device.delete() + db_initial_config.delete() + db_running_config.delete() return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig: device_uuid = request.device_uuid.uuid - self.data_cache.sync_device_from_context(device_uuid) - db_device = self.data_cache.get_device(device_uuid) - return DeviceConfig(config_rules=db_device.dump_initial_config()) + + sync_device_from_context(device_uuid, self.context_client, self.database) + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + + config_rules = {} if db_device is None else db_device.dump_initial_config() + return DeviceConfig(config_rules=config_rules) diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index b9b82b335e2a9ef76e784e6277944d1e08b4283a..7daa88709b9ce44e2173ea21b55920d26226da71 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -1,9 +1,9 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_setting +from context.client.ContextClient import ContextClient from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT from device.service.DeviceService import DeviceService -from device.service.data_cache.DataCache import DataCache from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS @@ -37,8 +37,11 @@ def main(): # Start metrics server start_http_server(metrics_port) - # Initialize DataCache - data_cache = DataCache(context_service_host, context_service_port) + # Initialize Context Client + if context_service_host is None or context_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( + str(context_service_host), str(context_service_port))) + context_client = ContextClient(context_service_host, context_service_port) # TODO: start monitoring loops to periodically report to Monitoring the collected data @@ -48,7 +51,8 @@ def main(): # Starting device service grpc_service = DeviceService( - data_cache, driver_instance_cache, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) + context_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers, + grace_period=grace_period) grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/device/service/data_cache/DataCache.py b/src/device/service/data_cache/DataCache.py deleted file mode 100644 index 43eccd91fd485b571b708d1e5aa338fe593d8853..0000000000000000000000000000000000000000 --- a/src/device/service/data_cache/DataCache.py +++ /dev/null @@ -1,58 +0,0 @@ -import grpc, logging -from typing import Any, Dict, List, Optional, Tuple, Union -from common.orm.Database import Database -from common.orm.Factory import get_database_backend -from common.orm.HighLevel import get_object -from common.orm.backend.BackendEnum import BackendEnum -from context.client.ContextClient import ContextClient -from device.proto.context_pb2 import Device, DeviceId -from device.service.driver_api.FilterFields import FilterFieldEnum -#from .database.ConfigModel import set_config -from .database.DeviceModel import DeviceModel, DriverModel -from .DeviceTools import update_device_config_in_local_database, update_device_in_local_database - -LOGGER = logging.getLogger(__name__) - -class DataCache: - def __init__(self, context_address : str, context_port : Union[str, int]) -> None: - if context_address is None or context_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( - str(context_address), str(context_port))) - self._context_client = ContextClient(context_address, context_port) - self._database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) - - def get_device(self, device_uuid : str) -> Optional[DeviceModel]: - return get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) - - def get_device_driver_filter_fields(self, device_uuid : str) -> Dict[FilterFieldEnum, Any]: - db_device = self.get_device(device_uuid) - db_driver_pks = db_device.references(DriverModel) - db_driver_names = [DriverModel(self._database, pk).driver.value for pk,_ in db_driver_pks] - return { - FilterFieldEnum.DEVICE_TYPE: db_device.device_type, - FilterFieldEnum.DRIVER : db_driver_names, - } - - def set_device(self, device : Device) -> Tuple[DeviceModel, bool]: - return update_device_in_local_database(self._database, device) - - def sync_device_from_context(self, device_uuid : str) -> bool: - try: - device : Device = self._context_client.GetDevice(DeviceId(device_uuid={'uuid': device_uuid})) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise - return None - return update_device_in_local_database(self._database, device) - - def sync_device_to_context(self, device_uuid : str): - db_device = get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) - self._context_client.SetDevice(Device(**db_device.dump( - include_config_rules=True, include_drivers=True, include_endpoints=True))) - - def delete_device_from_context(self, device_uuid : str): - db_device = get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) - self._context_client.RemoveDevice(DeviceId(**db_device.dump_id())) - - def update_device_config_in_local_database( - self, device_uuid : str, config_name : str, config_rules : List[Tuple[str, Any]]): - update_device_config_in_local_database(self._database, device_uuid, config_name, config_rules) diff --git a/src/device/service/data_cache/__init__.py b/src/device/service/data_cache/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/device/service/data_cache/database/ConfigModel.py b/src/device/service/database/ConfigModel.py similarity index 53% rename from src/device/service/data_cache/database/ConfigModel.py rename to src/device/service/database/ConfigModel.py index ce5904bc2e7a270ff1457465bb9b46e258c11811..82697bfbae7d1f58fdc851aa66f86f0fc78a767e 100644 --- a/src/device/service/data_cache/database/ConfigModel.py +++ b/src/device/service/database/ConfigModel.py @@ -1,6 +1,6 @@ import functools, logging, operator from enum import Enum -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Tuple, Union from common.orm.Database import Database from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object from common.orm.backend.Tools import key_to_str @@ -38,7 +38,7 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method position = IntegerField(min_value=0, required=True) action = EnumeratedField(ORM_ConfigActionEnum, required=True) key = StringField(required=True, allow_empty=False) - value = StringField(required=True, allow_empty=False) + value = StringField(required=False, allow_empty=True) def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ result = { @@ -49,55 +49,50 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method if include_position: result['position'] = self.position return result -def set_config_rule( - database : Database, db_config : ConfigModel, grpc_config_rule, position : int - ) -> Tuple[ConfigRuleModel, bool]: - - str_rule_key_hash = fast_hasher(grpc_config_rule.resource_key) - str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - - result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, { - 'config_fk': db_config, - 'position' : position, - 'action' : grpc_to_enum__config_action(grpc_config_rule.action), - 'key' : grpc_config_rule.resource_key, - 'value' : grpc_config_rule.resource_value, - }) - db_config_rule, updated = result - return db_config_rule, updated - -def delete_config_rule( - database : Database, db_config : ConfigModel, grpc_config_rule - ) -> Tuple[ConfigRuleModel, bool]: - - str_rule_key_hash = fast_hasher(grpc_config_rule.resource_key) - str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - - db_config_rule : Optional[ConfigRuleModel] = get_object( - database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False) - if db_config_rule is None: return - db_config_rule.delete() - -def set_config( - database : Database, db_parent_pk : str, config_name : str, grpc_config_rules +def delete_all_config_rules(database : Database, db_parent_pk : str, config_name : str) -> None: + str_config_key = key_to_str([db_parent_pk, config_name], separator=':') + db_config : ConfigModel = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False) + if db_config is None: return + db_config_rule_pks = db_config.references(ConfigRuleModel) + for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete() + +def grpc_config_rules_to_raw(grpc_config_rules) -> List[Tuple[ORM_ConfigActionEnum, str, str]]: + def translate(grpc_config_rule): + action = grpc_to_enum__config_action(grpc_config_rule.action) + return action, grpc_config_rule.resource_key, grpc_config_rule.resource_value + return [translate(grpc_config_rule) for grpc_config_rule in grpc_config_rules] + +def get_config_rules( + database : Database, db_parent_pk : str, config_name : str + ) -> List[Tuple[ORM_ConfigActionEnum, str, str]]: + + str_config_key = key_to_str([db_parent_pk, config_name], separator=':') + db_config = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False) + return [] if db_config is None else [ + (ORM_ConfigActionEnum._value2member_map_.get(config_rule['action']), + config_rule['resource_key'], config_rule['resource_value']) + for config_rule in db_config.dump() + ] + +def update_config( + database : Database, db_parent_pk : str, config_name : str, + raw_config_rules : List[Tuple[ORM_ConfigActionEnum, str, str]] ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: str_config_key = key_to_str([db_parent_pk, config_name], separator=':') result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key) db_config, created = result - db_objects = [(db_config, created)] - - for position,grpc_config_rule in enumerate(grpc_config_rules): - action = grpc_to_enum__config_action(grpc_config_rule.action) - if action == ORM_ConfigActionEnum.SET: - result : Tuple[ConfigRuleModel, bool] = set_config_rule(database, db_config, grpc_config_rule, position) - db_config_rule, updated = result - db_objects.append((db_config_rule, updated)) - elif action == ORM_ConfigActionEnum.DELETE: - delete_config_rule(database, db_config, grpc_config_rule) - else: - msg = 'Unsupported action({:s})' - raise AttributeError(msg.format(str(ConfigActionEnum.Name(grpc_config_rule.action)))) + db_objects : List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] + + for position,(action, resource_key, resource_value) in enumerate(raw_config_rules): + str_rule_key_hash = fast_hasher(resource_key) + str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') + result : Tuple[ConfigRuleModel, bool] = update_or_create_object( + database, ConfigRuleModel, str_config_rule_key, { + 'config_fk': db_config, 'position': position, 'action': action, 'key': resource_key, + 'value': resource_value}) + db_config_rule, updated = result + db_objects.append((db_config_rule, updated)) return db_objects diff --git a/src/device/service/data_cache/database/ContextModel.py b/src/device/service/database/ContextModel.py similarity index 100% rename from src/device/service/data_cache/database/ContextModel.py rename to src/device/service/database/ContextModel.py diff --git a/src/device/service/data_cache/DeviceTools.py b/src/device/service/database/DatabaseTools.py similarity index 58% rename from src/device/service/data_cache/DeviceTools.py rename to src/device/service/database/DatabaseTools.py index 242c2a455f6ff32b643ff66a7943aa974ef8a6cc..5b43aae70af054f5d6da0bd92d9f2e59d152dc84 100644 --- a/src/device/service/data_cache/DeviceTools.py +++ b/src/device/service/database/DatabaseTools.py @@ -1,19 +1,17 @@ -import logging, operator -from typing import Any, List, Tuple +import grpc +from typing import Any, Dict, Tuple from common.orm.Database import Database -from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object +from common.orm.HighLevel import get_or_create_object, update_or_create_object from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException -from context.proto.context_pb2 import ConfigActionEnum -from device.proto.context_pb2 import Device -from device.service.data_cache.database.Tools import fast_hasher, remove_dict_key -from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, set_config -from .database.ContextModel import ContextModel -from .database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers -from .database.EndPointModel import EndPointModel -from .database.TopologyModel import TopologyModel - -LOGGER = logging.getLogger(__name__) +from context.client.ContextClient import ContextClient +from device.proto.context_pb2 import Device, DeviceId +from device.service.driver_api.FilterFields import FilterFieldEnum +from .ConfigModel import delete_all_config_rules, grpc_config_rules_to_raw, update_config +from .ContextModel import ContextModel +from .DeviceModel import DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers +from .EndPointModel import EndPointModel +from .TopologyModel import TopologyModel def update_device_in_local_database(database : Database, device : Device) -> Tuple[DeviceModel, bool]: device_uuid = device.device_id.device_uuid.uuid @@ -26,8 +24,12 @@ def update_device_in_local_database(database : Database, device : Device) -> Tup 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) - initial_config_result = set_config(database, device_uuid, 'initial', []) - running_config_result = set_config(database, device_uuid, 'running', device.device_config.config_rules) + initial_config_result = update_config(database, device_uuid, 'initial', []) + + config_rules = grpc_config_rules_to_raw(device.device_config.config_rules) + delete_all_config_rules(database, device_uuid, 'running') + running_config_result = update_config(database, device_uuid, 'running', config_rules) + result : Tuple[DeviceModel, bool] = update_or_create_object(database, DeviceModel, device_uuid, { 'device_uuid' : device_uuid, 'device_type' : device.device_type, @@ -72,27 +74,37 @@ def update_device_in_local_database(database : Database, device : Device) -> Tup result : Tuple[EndPointModel, bool] = update_or_create_object( database, EndPointModel, str_endpoint_key, endpoint_attributes) - db_endpoint, db_endpoint_updated = result + _, db_endpoint_updated = result updated = updated or db_endpoint_updated return db_device, updated -def update_device_config_in_local_database( - database : Database, device_uuid : str, config_name : str, config_rules : List[Tuple[str, Any]]) -> None: - - str_config_key = key_to_str([device_uuid, config_name], separator=':') - db_config = get_object(database, ConfigModel, str_config_key) - db_config_rule_pks = db_config.references(ConfigRuleModel) - for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete() - - for position,config_rule in enumerate(config_rules): - config_rule_resource_key, config_rule_resource_value = config_rule - str_rule_key_hash = fast_hasher(config_rule_resource_key) - str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - update_or_create_object(database, ConfigRuleModel, str_config_rule_key, { - 'config_fk': db_config, - 'position' : position, - 'action' : ORM_ConfigActionEnum.SET, - 'key' : config_rule_resource_key, - 'value' : config_rule_resource_value, - }) +def sync_device_from_context( + device_uuid : str, context_client : ContextClient, database : Database + ) -> Tuple[DeviceModel, bool]: + + try: + device : Device = context_client.GetDevice(DeviceId(device_uuid={'uuid': device_uuid})) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + return None + return update_device_in_local_database(database, device) + +def sync_device_to_context(db_device : DeviceModel, context_client : ContextClient) -> None: + if db_device is None: return + context_client.SetDevice(Device(**db_device.dump( + include_config_rules=True, include_drivers=True, include_endpoints=True))) + +def delete_device_from_context(db_device : DeviceModel, context_client : ContextClient) -> None: + if db_device is None: return + context_client.RemoveDevice(DeviceId(**db_device.dump_id())) + +def get_device_driver_filter_fields(db_device : DeviceModel) -> Dict[FilterFieldEnum, Any]: + if db_device is None: return {} + database = db_device.database + db_driver_pks = db_device.references(DriverModel) + db_driver_names = [DriverModel(database, pk).driver.value for pk,_ in db_driver_pks] + return { + FilterFieldEnum.DEVICE_TYPE: db_device.device_type, + FilterFieldEnum.DRIVER : db_driver_names, + } diff --git a/src/device/service/data_cache/database/DeviceModel.py b/src/device/service/database/DeviceModel.py similarity index 100% rename from src/device/service/data_cache/database/DeviceModel.py rename to src/device/service/database/DeviceModel.py diff --git a/src/device/service/data_cache/database/EndPointModel.py b/src/device/service/database/EndPointModel.py similarity index 100% rename from src/device/service/data_cache/database/EndPointModel.py rename to src/device/service/database/EndPointModel.py diff --git a/src/device/service/data_cache/database/Tools.py b/src/device/service/database/Tools.py similarity index 100% rename from src/device/service/data_cache/database/Tools.py rename to src/device/service/database/Tools.py diff --git a/src/device/service/data_cache/database/TopologyModel.py b/src/device/service/database/TopologyModel.py similarity index 100% rename from src/device/service/data_cache/database/TopologyModel.py rename to src/device/service/database/TopologyModel.py diff --git a/src/device/service/data_cache/database/__init__.py b/src/device/service/database/__init__.py similarity index 100% rename from src/device/service/data_cache/database/__init__.py rename to src/device/service/database/__init__.py diff --git a/src/device/service/driver_api/FilterFields.py b/src/device/service/driver_api/FilterFields.py index 5c869c6553e6028659c82c1a7e9613e25bd85887..c27a2ac90a6840d2d68b16096be71fefe7ef1cdf 100644 --- a/src/device/service/driver_api/FilterFields.py +++ b/src/device/service/driver_api/FilterFields.py @@ -1,5 +1,5 @@ from enum import Enum -from device.service.data_cache.database.DeviceModel import ORM_DeviceDriverEnum +from device.service.database.DeviceModel import ORM_DeviceDriverEnum class DeviceTypeFilterFieldEnum(Enum): EMULATED = 'emulated' diff --git a/src/device/tests/example_objects.py b/src/device/tests/example_objects.py index 60267dee5ab24babaacb45f598d0474deaf87984..7894fbc00efed4221b7effa71772cb91a7f46cff 100644 --- a/src/device/tests/example_objects.py +++ b/src/device/tests/example_objects.py @@ -1,5 +1,4 @@ from copy import deepcopy -from typing import Any, Dict from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from context.proto.context_pb2 import ConfigActionEnum, DeviceDriverEnum, DeviceOperationalStatusEnum diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py index 5147bee2bbea8d8849a2cc5df4012bacd98ac9ba..f2ff57193db6942f9572ce76fe04d2e4f26cb10b 100644 --- a/src/device/tests/test_unitary.py +++ b/src/device/tests/test_unitary.py @@ -1,12 +1,9 @@ -import copy, logging, os, pytest +import copy, grpc, logging, os, pytest from typing import Any, Dict, List, Tuple -#from google.protobuf.json_format import MessageToDict -#from common.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker -#from common.tests.Assertions import validate_device_id, validate_empty from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) @@ -19,7 +16,6 @@ from device.Config import ( from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.service.DeviceService import DeviceService -from device.service.data_cache.DataCache import DataCache from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS @@ -72,12 +68,11 @@ def context_client(context_service : ContextService): # pylint: disable=redefine _client.close() @pytest.fixture(scope='session') -def device_service(): - data_cache = DataCache('127.0.0.1', CONTEXT_GRPC_SERVICE_PORT) +def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) _service = DeviceService( - data_cache, driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, + context_client, driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service @@ -89,18 +84,38 @@ def device_client(device_service : DeviceService): # pylint: disable=redefined-o yield _client _client.close() -def test_device_add(context_client : ContextClient, device_client : DeviceClient): +def test_device_add( + context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name + context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) - device_client.AddDevice(Device(**DEVICE1)) + + with pytest.raises(grpc.RpcError) as e: + device_client.AddDevice(Device(**DEVICE1)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg = 'device.device_config.config_rules(['\ + 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc1/value"\nresource_value: "value1"\n, '\ + 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc2/value"\nresource_value: "value2"\n, '\ + 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc3/value"\nresource_value: "value3"\n]) is invalid; '\ + 'RPC method AddDevice does not allow definition of Config Rules. Add the Device first, and then configure it.' + assert e.value.details() == msg + + DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1) + DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear() + device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES)) initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE1_ID)) LOGGER.info('initial_config = {:s}'.format(str(initial_config))) + device_data = context_client.GetDevice(DeviceId(**DEVICE1_ID)) + LOGGER.info('device_data = {:s}'.format(str(device_data))) + + device_client.ConfigureDevice(Device(**DEVICE1)) + DEVICE1_WITH = copy.deepcopy(DEVICE1) CONFIG_RULES : List[Dict[str, Any]] = DEVICE1_WITH['device_config']['config_rules'] CONFIG_RULES.clear() - CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_DELETE, 'dev/rsrc1/value', 'value1')) + CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_DELETE, 'dev/rsrc1/value', '')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc10/value', 'value10')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc11/value', 'value11')) CONFIG_RULES.append(config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc12/value', 'value12'))