Loading src/common/tools/grpc/ConfigRules.py +11 −4 Original line number Diff line number Diff line Loading @@ -18,24 +18,31 @@ import json from typing import Any, Dict, Tuple from common.proto.context_pb2 import ConfigActionEnum, ConfigRule from common.proto.context_pb2 import ConfigActionEnum from common.tools.grpc.Tools import grpc_message_to_json_string def update_config_rule_custom(config_rules, resource_key : str, fields : Dict[str, Tuple[Any, bool]]) -> ConfigRule: def update_config_rule_custom( config_rules, resource_key : str, fields : Dict[str, Tuple[Any, bool]], new_action : ConfigActionEnum = ConfigActionEnum.CONFIGACTION_SET ) -> None: # fields: Dict[field_name : str, Tuple[field_value : Any, raise_if_differs : bool]] # TODO: add support for ACL config rules for config_rule in config_rules: if config_rule.WhichOneof('config_rule') != 'custom': continue kind = config_rule.WhichOneof('config_rule') if kind != 'custom': continue if config_rule.custom.resource_key != resource_key: continue json_resource_value = json.loads(config_rule.custom.resource_value) break # found, end loop else: # not found, add it config_rule = config_rules.add() # pylint: disable=no-member config_rule.action = ConfigActionEnum.CONFIGACTION_SET config_rule.custom.resource_key = resource_key json_resource_value = {} config_rule.action = new_action for field_name,(field_value, raise_if_differs) in fields.items(): if (field_name not in json_resource_value) or not raise_if_differs: # missing or raise_if_differs=False, add/update it Loading Loading
src/common/tools/grpc/ConfigRules.py +11 −4 Original line number Diff line number Diff line Loading @@ -18,24 +18,31 @@ import json from typing import Any, Dict, Tuple from common.proto.context_pb2 import ConfigActionEnum, ConfigRule from common.proto.context_pb2 import ConfigActionEnum from common.tools.grpc.Tools import grpc_message_to_json_string def update_config_rule_custom(config_rules, resource_key : str, fields : Dict[str, Tuple[Any, bool]]) -> ConfigRule: def update_config_rule_custom( config_rules, resource_key : str, fields : Dict[str, Tuple[Any, bool]], new_action : ConfigActionEnum = ConfigActionEnum.CONFIGACTION_SET ) -> None: # fields: Dict[field_name : str, Tuple[field_value : Any, raise_if_differs : bool]] # TODO: add support for ACL config rules for config_rule in config_rules: if config_rule.WhichOneof('config_rule') != 'custom': continue kind = config_rule.WhichOneof('config_rule') if kind != 'custom': continue if config_rule.custom.resource_key != resource_key: continue json_resource_value = json.loads(config_rule.custom.resource_value) break # found, end loop else: # not found, add it config_rule = config_rules.add() # pylint: disable=no-member config_rule.action = ConfigActionEnum.CONFIGACTION_SET config_rule.custom.resource_key = resource_key json_resource_value = {} config_rule.action = new_action for field_name,(field_value, raise_if_differs) in fields.items(): if (field_name not in json_resource_value) or not raise_if_differs: # missing or raise_if_differs=False, add/update it Loading