diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 9d0f9bd3ec018c707b9c26c9d61663043589d9f5..628b0884fccf36a905d4d2067486e84270e27aaf 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -15,12 +15,13 @@ import grpc, logging from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException -from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty +from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, DeviceOperationalStatusEnum, Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.device_pb2_grpc import DeviceServiceServicer from common.tools.context_queries.Device import get_device from common.tools.mutex_queues.MutexQueues import MutexQueues from context.client.ContextClient import ContextClient +from device.service.Errors import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI from .driver_api._Driver import _Driver from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver from .monitoring.MonitoringLoops import MonitoringLoops @@ -32,8 +33,6 @@ LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Device', 'RPC') -ERROR_MISSING_DRIVER = 'Device({:s}) has not been added to this Device instance' - class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None: LOGGER.debug('Creating Servicer...') @@ -97,6 +96,9 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): msg = ERROR_MISSING_DRIVER.format(str(device_uuid)) raise OperationFailedException('ConfigureDevice', extra_details=msg) + if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED: + device.device_operational_status = request.device_operational_status + # TODO: use of datastores (might be virtual ones) to enable rollbacks resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request) @@ -110,13 +112,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): # Rules updated by configure_rules() and deconfigure_rules() methods. # Code to be removed soon if not needed. - #running_config_rules = driver.GetConfig() - #for config_rule in running_config_rules: - # if isinstance(config_rule[1], Exception): continue - # config_rule = device.device_config.config_rules.add() - # config_rule.action = ConfigActionEnum.CONFIGACTION_SET - # config_rule.custom.resource_key = config_rule[0] - # config_rule.custom.resource_value = json.dumps(config_rule[1], sort_keys=True) + del device.device_config.config_rules[:] + populate_config_rules(device, driver) device_id = context_client.SetDevice(device) return device_id @@ -161,10 +158,20 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: - device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi + if subscribe: + device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + else: + # unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs + kpi_uuid = request.kpi_id.kpi_id.uuid + kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid) + if kpi_details is None: + msg = ERROR_MISSING_KPI.format(str(kpi_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + device_uuid = kpi_details[0] + self.mutex_queues.wait_my_turn(device_uuid) try: driver : _Driver = self.driver_instance_cache.get(device_uuid) diff --git a/src/device/service/Errors.py b/src/device/service/Errors.py new file mode 100644 index 0000000000000000000000000000000000000000..5f2fc499619debef254515f8eb198f339d5930ab --- /dev/null +++ b/src/device/service/Errors.py @@ -0,0 +1,30 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ERROR_MISSING_DRIVER = 'Device({:s}) has not been added to this Device instance' +ERROR_MISSING_KPI = 'Kpi({:s}) not found' + +ERROR_BAD_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})' + +ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})' +ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})' +ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})' +ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})' + +ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported' + +ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ + 'error({:s})' +ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ + 'error({:s})' diff --git a/src/device/service/Tools.py b/src/device/service/Tools.py index 0698be883c83c6b9863ed8410a0fe266ecc5419d..d2cd0b48104857ac8a4525feb28a4ca480e0aec1 100644 --- a/src/device/service/Tools.py +++ b/src/device/service/Tools.py @@ -13,7 +13,7 @@ # limitations under the License. import json -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Union from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.method_wrappers.ServiceExceptions import InvalidArgumentException from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig @@ -22,18 +22,9 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.Tools import grpc_message_to_json from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS from .monitoring.MonitoringLoops import MonitoringLoops - -ERROR_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})' -ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})' -ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})' -ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})' -ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})' -ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported' -ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ - 'error({:s})' -ERROR_MISSING_KPI = 'Device({:s}): Kpi({:s}) not found' -ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ - 'error({:s})' +from .Errors import ( + ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET, + ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE) def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: connection_config_rules = dict() @@ -91,7 +82,7 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon errors : List[str] = list() for endpoint in results_getconfig: if len(endpoint) != 2: - errors.append(ERROR_ENDPOINT.format(device_uuid, str(endpoint))) + errors.append(ERROR_BAD_ENDPOINT.format(device_uuid, str(endpoint))) continue resource_key, resource_value = endpoint @@ -115,40 +106,35 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon return errors -def populate_config_rules(device : Device, driver : _Driver) -> List[str]: - device_uuid = device.device_id.device_uuid.uuid - - resources_to_get = ['ALL'] - results_getconfig = driver.GetConfig() - +def _raw_config_rules_to_grpc( + device_uuid : str, device_config : DeviceConfig, error_template : str, default_config_action : ConfigActionEnum, + raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]] +) -> List[str]: errors : List[str] = list() - for resource_key, resource_value in zip(resources_to_get, results_getconfig): + + for resource_key, resource_value in raw_config_rules: if isinstance(resource_value, Exception): - errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value))) + errors.append(error_template.format(device_uuid, str(resource_key), str(resource_value))) continue - config_rule = device.device_config.config_rules.add() - config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule = device_config.config_rules.add() + config_rule.action = default_config_action config_rule.custom.resource_key = resource_key - config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) + config_rule.custom.resource_value = \ + resource_value if isinstance(resource_value, str) else json.dumps(resource_value, sort_keys=True) return errors +def populate_config_rules(device : Device, driver : _Driver) -> List[str]: + device_uuid = device.device_id.device_uuid.uuid + results_getconfig = driver.GetConfig() + return _raw_config_rules_to_grpc( + device_uuid, device.device_config, ERROR_GET, ConfigActionEnum.CONFIGACTION_SET, results_getconfig) + def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]: results_getinitconfig = driver.GetInitialConfig() - - errors : List[str] = list() - for resource_key, resource_value in results_getinitconfig: - if isinstance(resource_value, Exception): - errors.append(ERROR_GET_INIT.format(device_uuid, str(resource_key), str(resource_value))) - continue - - config_rule = device_config.config_rules.add() - config_rule.action = ConfigActionEnum.CONFIGACTION_SET - config_rule.custom.resource_key = resource_key - config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) - - return errors + return _raw_config_rules_to_grpc( + device_uuid, device_config, ERROR_GET_INIT, ConfigActionEnum.CONFIGACTION_SET, results_getinitconfig) def compute_rules_to_add_delete( device : Device, request : Device @@ -186,37 +172,27 @@ def configure_rules(device : Device, driver : _Driver, resources_to_set : List[T device_uuid = device.device_id.device_uuid.uuid results_setconfig = driver.SetConfig(resources_to_set) + results_setconfig = [ + (resource_key, result if isinstance(result, Exception) else resource_value) + for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig) + ] - errors : List[str] = list() - for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig): - if isinstance(result, Exception): - errors.append(ERROR_SET.format(device_uuid, str(resource_key), str(resource_value), str(result))) - continue - # add to config of device - config_rule = device.device_config.config_rules.add() - config_rule.action = ConfigActionEnum.CONFIGACTION_SET - config_rule.custom.resource_key = resource_key - config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) - - return errors + device_config = DeviceConfig() # ignored; added at the end of ConfigureDevice + return _raw_config_rules_to_grpc( + device_uuid, device_config, ERROR_SET, ConfigActionEnum.CONFIGACTION_SET, results_setconfig) def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]: device_uuid = device.device_id.device_uuid.uuid results_deleteconfig = driver.DeleteConfig(resources_to_delete) + results_deleteconfig = [ + (resource_key, result if isinstance(result, Exception) else resource_value) + for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig) + ] - errors : List[str] = list() - for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig): - if isinstance(result, Exception): - errors.append(ERROR_DELETE.format(device_uuid, str(resource_key), str(resource_value), str(result))) - continue - # remove from config of device - config_rule = device.device_config.config_rules.add() - config_rule.action = ConfigActionEnum.CONFIGACTION_SET - config_rule.custom.resource_key = resource_key - config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) - - return errors + device_config = DeviceConfig() # ignored; added at the end of ConfigureDevice + return _raw_config_rules_to_grpc( + device_uuid, device_config, ERROR_DELETE, ConfigActionEnum.CONFIGACTION_DELETE, results_deleteconfig) def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: kpi_uuid = request.kpi_id.kpi_id.uuid @@ -253,20 +229,11 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo return errors def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: - kpi_uuid = request.kpi_id.kpi_id.uuid - device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid - #endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid - #kpi_sample_type = request.kpi_descriptor.kpi_sample_type - - # TODO: consider if further validation needs to be done (correct endpoint_uuid?, correct kpi_sample_type?) - #resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) - #if resource_key is None: - # kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') - # return [ERROR_SAMPLETYPE.format(device_uuid, endpoint_uuid, str(kpi_sample_type), str(kpi_sample_type_name))] + kpi_uuid = request.kpi_id.kpi_id.uuid kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid) if kpi_details is None: - return [ERROR_MISSING_KPI.format(str(device_uuid), str(kpi_uuid))] + return [ERROR_MISSING_KPI.format(str(kpi_uuid))] device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details diff --git a/src/device/tests/Device_Emulated.py b/src/device/tests/Device_Emulated.py index 7b8f15918146fcde9e920825e42e2985deeaee24..cf564b0bf380798f329df11e9598126ae9e456e7 100644 --- a/src/device/tests/Device_Emulated.py +++ b/src/device/tests/Device_Emulated.py @@ -18,7 +18,7 @@ from common.tools.object_factory.Device import ( json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled, json_device_id) from device.tests.CommonObjects import PACKET_PORT_SAMPLE_TYPES -DEVICE_EMU_UUID = 'EMULATED' +DEVICE_EMU_UUID = 'R1-EMU' DEVICE_EMU_ID = json_device_id(DEVICE_EMU_UUID) DEVICE_EMU = json_device_emulated_packet_router_disabled(DEVICE_EMU_UUID) DEVICE_EMU_EP_UUIDS = ['EP1', 'EP2', 'EP3', 'EP4'] diff --git a/src/device/tests/test_unitary_emulated.py b/src/device/tests/test_unitary_emulated.py index 745c25c1eba679dc67e0ed9e04f38eb0ae8c3af4..8a1b30a6ec01ec004c92be97d27e318e427f4cbe 100644 --- a/src/device/tests/test_unitary_emulated.py +++ b/src/device/tests/test_unitary_emulated.py @@ -168,12 +168,14 @@ def test_device_emulated_configure( config_rule = ( ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'], json.loads(json.dumps(config_rule['custom']['resource_value']))) + #LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule)) assert config_rule in config_rules for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: assert 'custom' in config_rule config_rule = ( ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'], json.loads(json.dumps(config_rule['custom']['resource_value']))) + #LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule)) assert config_rule in config_rules # Try to reconfigure... @@ -222,6 +224,7 @@ def test_device_emulated_configure( config_rule = ( ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'], config_rule['custom']['resource_value']) + #LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule)) assert config_rule in config_rules