# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import operator from typing import Any, Callable, List, Tuple, Union ACTION_MSG_GET = 'Get resource(key={:s})' ACTION_MSG_SET = 'Set resource(key={:s}, value={:s})' ACTION_MSG_DELETE = 'Delete resource(key={:s}, value={:s})' ACTION_MSG_SUBSCRIBE = 'Subscribe subscription(key={:s}, duration={:s}, interval={:s})' ACTION_MSG_UNSUBSCRIBE = 'Unsubscribe subscription(key={:s}, duration={:s}, interval={:s})' def _get(resource_key : str): return ACTION_MSG_GET.format(str(resource_key)) def _set(resource : Tuple[str, Any]): return ACTION_MSG_SET.format(*tuple(map(str, resource))) def _delete(resource : Tuple[str, Any]): return ACTION_MSG_SET.format(*tuple(map(str, resource))) def _subscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_SUBSCRIBE.format(*tuple(map(str, subscription))) def _unsubscribe(subscription : Tuple[str, float, float]): return ACTION_MSG_UNSUBSCRIBE.format(*tuple(map(str, subscription))) def _check_errors( error_func : Callable, parameters_list : List[Any], results_list : List[Union[bool, Exception]] ) -> List[str]: errors = [] for parameters, results in zip(parameters_list, results_list): if not isinstance(results, Exception): continue errors.append('Unable to {:s}; error({:s})'.format(error_func(parameters), str(results))) return errors def check_get_errors( resource_keys : List[str], results : List[Tuple[str, Union[Any, None, Exception]]] ) -> List[str]: return _check_errors(_get, resource_keys, map(operator.itemgetter(1), results)) def check_set_errors( resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]] ) -> List[str]: return _check_errors(_set, resources, results) def check_delete_errors( resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]] ) -> List[str]: return _check_errors(_delete, resources, results) def check_subscribe_errors( subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] ) -> List[str]: return _check_errors(_subscribe, subscriptions, results) def check_unsubscribe_errors( subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] ) -> List[str]: return _check_errors(_unsubscribe, subscriptions, results)