Loading src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +161 −26 Original line number Diff line number Diff line Loading @@ -203,7 +203,12 @@ class GnmiSessionHandler: if isinstance(v, dict) or isinstance(v, list): removed += self._sanitize_unknown_identityrefs(v) else: if isinstance(v, str) and (':' not in v) and v.upper() == v and any(c.isalpha() for c in v): if isinstance(v, str): # Remove specific problematic values that Stratum sends if v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED'}: keys_to_delete.append(k) # Remove ALLCAPS strings without module prefix that look like unknown identityrefs elif (':' not in v) and v.upper() == v and any(c.isalpha() for c in v): keys_to_delete.append(k) for k in keys_to_delete: del obj[k] Loading Loading @@ -258,28 +263,42 @@ class GnmiSessionHandler: return removed def _coerce_boolean_enums_to_strings(self, obj: Any) -> int: """Recursively coerce boolean values to appropriate string enums for specific OpenConfig fields.""" """Recursively coerce boolean values and string booleans to appropriate string enums for specific OpenConfig fields.""" changed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): changed += self._coerce_boolean_enums_to_strings(v) else: # Handle boolean values if isinstance(v, bool): # Handle specific OpenConfig enum fields that Stratum sends as booleans if k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'}: obj[k] = 'true' if v else 'false' changed += 1 elif k in {'enabled'}: # Keep as boolean for config fields # These fields should remain as actual boolean values according to OpenConfig schema if k in {'enabled', 'auto-negotiate', 'forwarding-viable', 'loopback-mode'}: # Keep as boolean - don't convert to string continue elif 'status' in k: obj[k] = 'UP' if v else 'DOWN' changed += 1 else: # Default coercion for other boolean fields # For other boolean fields, convert to string boolean obj[k] = 'true' if v else 'false' changed += 1 # Handle string values that look like booleans but are actually enums elif isinstance(v, str): v_lower = v.lower() if v_lower in {'true', 'false'}: if k in {'enabled', 'auto-negotiate', 'forwarding-viable', 'loopback-mode', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: # Convert string boolean to actual boolean for these fields obj[k] = (v_lower == 'true') changed += 1 elif 'status' in k: # Convert string boolean to UP/DOWN for status fields obj[k] = 'UP' if v_lower == 'true' else 'DOWN' changed += 1 else: # For other fields, convert string boolean to actual boolean obj[k] = (v_lower == 'true') changed += 1 elif isinstance(obj, list): for item in obj: changed += self._coerce_boolean_enums_to_strings(item) Loading @@ -297,6 +316,9 @@ class GnmiSessionHandler: # Remove Stratum-specific fields that cause validation errors if k in {'flow-programming-exception', 'memory-error-alarm', 'node-info', 'front-panel-port-info'}: keys_to_delete.append(k) # Remove fields that commonly contain "UNKNOWN" values in Stratum elif k in {'severity', 'state', 'status', 'type'} and isinstance(v, str) and v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 Loading @@ -305,6 +327,89 @@ class GnmiSessionHandler: removed += self._filter_stratum_specific_fields(item) return removed def _remove_remaining_unknown_values(self, obj: Any) -> int: """Remove any remaining 'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED' values that might cause validation errors.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._remove_remaining_unknown_values(v) else: # Remove any remaining problematic string values if isinstance(v, str) and v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED', 'N/A', 'NULL'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._remove_remaining_unknown_values(item) return removed def _check_for_string_numbers(self, obj: Any, path: str) -> None: """Debug method to check for string numbers that might cause validation errors.""" def _check_recursive(data, current_path=""): if isinstance(data, dict): for k, v in data.items(): new_path = f"{current_path}.{k}" if current_path else k if isinstance(v, str): # Check if string looks like a number if v.isdigit() or (v.startswith('-') and v[1:].isdigit()): self._logger.warning('Found string number at %s: %s (type: %s)', new_path, v, type(v)) elif isinstance(v, (dict, list)): _check_recursive(v, new_path) elif isinstance(data, list): for i, item in enumerate(data): new_path = f"{current_path}[{i}]" if current_path else f"[{i}]" _check_recursive(item, new_path) self._logger.info('Checking for string numbers in reconstructed data for path: %s', path) _check_recursive(obj) def _is_numeric_field(self, field_name: str) -> bool: """Check if a field name suggests it should contain numeric values.""" numeric_indicators = { 'counters', 'octets', 'pkts', 'errors', 'discards', 'transitions', 'mtu', 'index', 'id', 'speed', 'port-id', 'channel-id', 'transceiver-id', 'component-id', 'alarm-id', 'severity-id', 'temperature', 'voltage', 'power', 'bias-current', 'optical-power', 'last-clear', 'last-change' } field_lower = field_name.lower() return any(indicator in field_lower for indicator in numeric_indicators) def _fix_string_enum_values(self, obj: Any) -> int: """Fix string values that should be proper enums but are causing validation errors.""" fixed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): fixed += self._fix_string_enum_values(v) else: if isinstance(v, str): # Handle specific cases where string values need to be converted to proper enums if k == 'oper-status' and v.lower() in {'true', 'false'}: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' fixed += 1 elif k == 'admin-status' and v.lower() in {'true', 'false'}: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' fixed += 1 elif k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'} and v.lower() in {'true', 'false'}: # These fields expect string booleans, keep as is pass elif v.lower() in {'true', 'false'} and k not in {'enabled', 'description', 'name'}: # For other fields, convert to proper enum values if 'status' in k: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' else: obj[k] = v.lower() fixed += 1 elif isinstance(obj, list): for item in obj: fixed += self._fix_string_enum_values(item) return fixed def _enrich_missing_types(self, resource_path: str, data: Dict) -> int: """Populate default type identityrefs if missing to enable handlers to produce entries.""" added = 0 Loading Loading @@ -353,17 +458,15 @@ class GnmiSessionHandler: # Deep copy and sanitize sanitized = json.loads(json.dumps(enriched)) removed = self._sanitize_unknown_identityrefs(sanitized) changed = self._coerce_enum_like_booleans(sanitized) empty_removed = self._sanitize_empty_strings(sanitized) enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) stratum_filtered = self._filter_stratum_specific_fields(sanitized) # Additional cleanup for any remaining "UNKNOWN" values unknown_removed = self._remove_remaining_unknown_values(sanitized) # Handle boolean/enum conversions LAST to avoid conflicts enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) added2 = self._enrich_missing_types(resource_path, sanitized) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, empty_strings_removed=%d, enum_booleans_coerced=%d, stratum_fields_filtered=%d, defaults_added=%d', resource_path, removed, changed, empty_removed, enum_coerced, stratum_filtered, added2) try: self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, empty_strings_removed=%d, stratum_fields_filtered=%d, unknown_values_removed=%d, enum_booleans_coerced=%d, defaults_added=%d', resource_path, removed, empty_removed, stratum_filtered, unknown_removed, enum_coerced, added2) return parse(resource_path, sanitized, self._yang_handler) except Exception as e2: self._logger.error('Sanitized parse still failed for %s: %s. Skipping this batch.', resource_path, e2) return [] def _prefix_augmented_node(self, node_name: str) -> str: """ Loading Loading @@ -477,17 +580,48 @@ class GnmiSessionHandler: node_name = self._prefix_augmented_node(segment) if is_last: # Coerce enumerations for known leaves (interfaces) if node_name in {'oper-status', 'admin-status'}: # Accept bool or strings like 'true'/'false'/'0'/'1' # Handle boolean values for specific field types if isinstance(value, bool): if 'status' in node_name.lower(): value = 'UP' if value else 'DOWN' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: # These fields expect boolean values, keep as-is pass else: # For other boolean fields, convert to string value = 'true' if value else 'false' # Handle string values that should be converted elif isinstance(value, str): vlow = value.strip().lower() if vlow in {'true', '1', 'up'}: # First, check if this is a numeric field that should be converted if self._is_numeric_field(node_name) and (vlow.isdigit() or (vlow.startswith('-') and vlow[1:].isdigit())): try: if '.' in vlow or 'e' in vlow.lower(): value = float(vlow) else: value = int(vlow) except (ValueError, AttributeError): pass # Keep as string if conversion fails # Then handle status-like conversions elif vlow in {'true', '1', 'up'}: if 'status' in node_name.lower(): value = 'UP' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: value = True else: value = 'UP' elif vlow in {'false', '0', 'down'}: if 'status' in node_name.lower(): value = 'DOWN' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: value = False else: value = 'DOWN' # Ensure numeric values are proper types elif isinstance(value, (int, float)): # Keep numeric values as-is pass # Prefix identityref values for type leaves if node_name == 'type' and i >= 1 and segments[i - 1] == 'state' and isinstance(value, str) and ':' not in value: if top_kind == 'interfaces': Loading Loading @@ -610,6 +744,7 @@ class GnmiSessionHandler: reconstructed_data = self._reconstruct_object_from_updates(notification.update) self._logger.debug('Reconstructed data keys: %s', list(reconstructed_data.keys())) self._logger.debug('Calling parse with path: %s and data: %s', common_path, reconstructed_data) # Parse the reconstructed object at the resource level with generic sanitization fallback results.extend(self._try_parse_with_sanitization(common_path, reconstructed_data)) except Exception as e: Loading src/device/service/drivers/gnmi_openconfig/tools/Value.py +2 −2 Original line number Diff line number Diff line Loading @@ -72,9 +72,9 @@ def decode_value(value : TypedValue) -> Any: value = b_value.decode('UTF-8') return json.loads(value) elif encoding == 'uint_val': return value.uint_val return int(value.uint_val) elif encoding == 'int_val': return value.int_val return int(value.int_val) elif encoding == 'string_val': return value.string_val elif encoding == 'bool_val': Loading Loading
src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +161 −26 Original line number Diff line number Diff line Loading @@ -203,7 +203,12 @@ class GnmiSessionHandler: if isinstance(v, dict) or isinstance(v, list): removed += self._sanitize_unknown_identityrefs(v) else: if isinstance(v, str) and (':' not in v) and v.upper() == v and any(c.isalpha() for c in v): if isinstance(v, str): # Remove specific problematic values that Stratum sends if v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED'}: keys_to_delete.append(k) # Remove ALLCAPS strings without module prefix that look like unknown identityrefs elif (':' not in v) and v.upper() == v and any(c.isalpha() for c in v): keys_to_delete.append(k) for k in keys_to_delete: del obj[k] Loading Loading @@ -258,28 +263,42 @@ class GnmiSessionHandler: return removed def _coerce_boolean_enums_to_strings(self, obj: Any) -> int: """Recursively coerce boolean values to appropriate string enums for specific OpenConfig fields.""" """Recursively coerce boolean values and string booleans to appropriate string enums for specific OpenConfig fields.""" changed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): changed += self._coerce_boolean_enums_to_strings(v) else: # Handle boolean values if isinstance(v, bool): # Handle specific OpenConfig enum fields that Stratum sends as booleans if k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'}: obj[k] = 'true' if v else 'false' changed += 1 elif k in {'enabled'}: # Keep as boolean for config fields # These fields should remain as actual boolean values according to OpenConfig schema if k in {'enabled', 'auto-negotiate', 'forwarding-viable', 'loopback-mode'}: # Keep as boolean - don't convert to string continue elif 'status' in k: obj[k] = 'UP' if v else 'DOWN' changed += 1 else: # Default coercion for other boolean fields # For other boolean fields, convert to string boolean obj[k] = 'true' if v else 'false' changed += 1 # Handle string values that look like booleans but are actually enums elif isinstance(v, str): v_lower = v.lower() if v_lower in {'true', 'false'}: if k in {'enabled', 'auto-negotiate', 'forwarding-viable', 'loopback-mode', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: # Convert string boolean to actual boolean for these fields obj[k] = (v_lower == 'true') changed += 1 elif 'status' in k: # Convert string boolean to UP/DOWN for status fields obj[k] = 'UP' if v_lower == 'true' else 'DOWN' changed += 1 else: # For other fields, convert string boolean to actual boolean obj[k] = (v_lower == 'true') changed += 1 elif isinstance(obj, list): for item in obj: changed += self._coerce_boolean_enums_to_strings(item) Loading @@ -297,6 +316,9 @@ class GnmiSessionHandler: # Remove Stratum-specific fields that cause validation errors if k in {'flow-programming-exception', 'memory-error-alarm', 'node-info', 'front-panel-port-info'}: keys_to_delete.append(k) # Remove fields that commonly contain "UNKNOWN" values in Stratum elif k in {'severity', 'state', 'status', 'type'} and isinstance(v, str) and v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 Loading @@ -305,6 +327,89 @@ class GnmiSessionHandler: removed += self._filter_stratum_specific_fields(item) return removed def _remove_remaining_unknown_values(self, obj: Any) -> int: """Remove any remaining 'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED' values that might cause validation errors.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._remove_remaining_unknown_values(v) else: # Remove any remaining problematic string values if isinstance(v, str) and v in {'UNKNOWN', 'UNSUPPORTED', 'NOT_IMPLEMENTED', 'N/A', 'NULL'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._remove_remaining_unknown_values(item) return removed def _check_for_string_numbers(self, obj: Any, path: str) -> None: """Debug method to check for string numbers that might cause validation errors.""" def _check_recursive(data, current_path=""): if isinstance(data, dict): for k, v in data.items(): new_path = f"{current_path}.{k}" if current_path else k if isinstance(v, str): # Check if string looks like a number if v.isdigit() or (v.startswith('-') and v[1:].isdigit()): self._logger.warning('Found string number at %s: %s (type: %s)', new_path, v, type(v)) elif isinstance(v, (dict, list)): _check_recursive(v, new_path) elif isinstance(data, list): for i, item in enumerate(data): new_path = f"{current_path}[{i}]" if current_path else f"[{i}]" _check_recursive(item, new_path) self._logger.info('Checking for string numbers in reconstructed data for path: %s', path) _check_recursive(obj) def _is_numeric_field(self, field_name: str) -> bool: """Check if a field name suggests it should contain numeric values.""" numeric_indicators = { 'counters', 'octets', 'pkts', 'errors', 'discards', 'transitions', 'mtu', 'index', 'id', 'speed', 'port-id', 'channel-id', 'transceiver-id', 'component-id', 'alarm-id', 'severity-id', 'temperature', 'voltage', 'power', 'bias-current', 'optical-power', 'last-clear', 'last-change' } field_lower = field_name.lower() return any(indicator in field_lower for indicator in numeric_indicators) def _fix_string_enum_values(self, obj: Any) -> int: """Fix string values that should be proper enums but are causing validation errors.""" fixed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): fixed += self._fix_string_enum_values(v) else: if isinstance(v, str): # Handle specific cases where string values need to be converted to proper enums if k == 'oper-status' and v.lower() in {'true', 'false'}: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' fixed += 1 elif k == 'admin-status' and v.lower() in {'true', 'false'}: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' fixed += 1 elif k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'} and v.lower() in {'true', 'false'}: # These fields expect string booleans, keep as is pass elif v.lower() in {'true', 'false'} and k not in {'enabled', 'description', 'name'}: # For other fields, convert to proper enum values if 'status' in k: obj[k] = 'UP' if v.lower() == 'true' else 'DOWN' else: obj[k] = v.lower() fixed += 1 elif isinstance(obj, list): for item in obj: fixed += self._fix_string_enum_values(item) return fixed def _enrich_missing_types(self, resource_path: str, data: Dict) -> int: """Populate default type identityrefs if missing to enable handlers to produce entries.""" added = 0 Loading Loading @@ -353,17 +458,15 @@ class GnmiSessionHandler: # Deep copy and sanitize sanitized = json.loads(json.dumps(enriched)) removed = self._sanitize_unknown_identityrefs(sanitized) changed = self._coerce_enum_like_booleans(sanitized) empty_removed = self._sanitize_empty_strings(sanitized) enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) stratum_filtered = self._filter_stratum_specific_fields(sanitized) # Additional cleanup for any remaining "UNKNOWN" values unknown_removed = self._remove_remaining_unknown_values(sanitized) # Handle boolean/enum conversions LAST to avoid conflicts enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) added2 = self._enrich_missing_types(resource_path, sanitized) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, empty_strings_removed=%d, enum_booleans_coerced=%d, stratum_fields_filtered=%d, defaults_added=%d', resource_path, removed, changed, empty_removed, enum_coerced, stratum_filtered, added2) try: self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, empty_strings_removed=%d, stratum_fields_filtered=%d, unknown_values_removed=%d, enum_booleans_coerced=%d, defaults_added=%d', resource_path, removed, empty_removed, stratum_filtered, unknown_removed, enum_coerced, added2) return parse(resource_path, sanitized, self._yang_handler) except Exception as e2: self._logger.error('Sanitized parse still failed for %s: %s. Skipping this batch.', resource_path, e2) return [] def _prefix_augmented_node(self, node_name: str) -> str: """ Loading Loading @@ -477,17 +580,48 @@ class GnmiSessionHandler: node_name = self._prefix_augmented_node(segment) if is_last: # Coerce enumerations for known leaves (interfaces) if node_name in {'oper-status', 'admin-status'}: # Accept bool or strings like 'true'/'false'/'0'/'1' # Handle boolean values for specific field types if isinstance(value, bool): if 'status' in node_name.lower(): value = 'UP' if value else 'DOWN' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: # These fields expect boolean values, keep as-is pass else: # For other boolean fields, convert to string value = 'true' if value else 'false' # Handle string values that should be converted elif isinstance(value, str): vlow = value.strip().lower() if vlow in {'true', '1', 'up'}: # First, check if this is a numeric field that should be converted if self._is_numeric_field(node_name) and (vlow.isdigit() or (vlow.startswith('-') and vlow[1:].isdigit())): try: if '.' in vlow or 'e' in vlow.lower(): value = float(vlow) else: value = int(vlow) except (ValueError, AttributeError): pass # Keep as string if conversion fails # Then handle status-like conversions elif vlow in {'true', '1', 'up'}: if 'status' in node_name.lower(): value = 'UP' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: value = True else: value = 'UP' elif vlow in {'false', '0', 'down'}: if 'status' in node_name.lower(): value = 'DOWN' elif node_name in {'enabled', 'loopback-mode', 'auto-negotiate', 'forwarding-viable', 'hold-time', 'carrier-detect', 'suppress-fec', 'fec-mode'}: value = False else: value = 'DOWN' # Ensure numeric values are proper types elif isinstance(value, (int, float)): # Keep numeric values as-is pass # Prefix identityref values for type leaves if node_name == 'type' and i >= 1 and segments[i - 1] == 'state' and isinstance(value, str) and ':' not in value: if top_kind == 'interfaces': Loading Loading @@ -610,6 +744,7 @@ class GnmiSessionHandler: reconstructed_data = self._reconstruct_object_from_updates(notification.update) self._logger.debug('Reconstructed data keys: %s', list(reconstructed_data.keys())) self._logger.debug('Calling parse with path: %s and data: %s', common_path, reconstructed_data) # Parse the reconstructed object at the resource level with generic sanitization fallback results.extend(self._try_parse_with_sanitization(common_path, reconstructed_data)) except Exception as e: Loading
src/device/service/drivers/gnmi_openconfig/tools/Value.py +2 −2 Original line number Diff line number Diff line Loading @@ -72,9 +72,9 @@ def decode_value(value : TypedValue) -> Any: value = b_value.decode('UTF-8') return json.loads(value) elif encoding == 'uint_val': return value.uint_val return int(value.uint_val) elif encoding == 'int_val': return value.int_val return int(value.int_val) elif encoding == 'string_val': return value.string_val elif encoding == 'bool_val': Loading