Loading src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +85 −47 Original line number Diff line number Diff line Loading @@ -235,6 +235,76 @@ class GnmiSessionHandler: changed += self._coerce_enum_like_booleans(item) return changed def _sanitize_empty_strings(self, obj: Any) -> int: """Recursively remove empty string leaves that violate OpenConfig schema constraints.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._sanitize_empty_strings(v) else: # Remove empty strings for fields that should have meaningful values if isinstance(v, str) and v == "": # Keep some fields that can legitimately be empty if k not in {'description', 'name'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._sanitize_empty_strings(item) return removed def _coerce_boolean_enums_to_strings(self, obj: Any) -> int: """Recursively coerce boolean values to appropriate string enums for specific OpenConfig fields.""" changed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): changed += self._coerce_boolean_enums_to_strings(v) else: if isinstance(v, bool): # Handle specific OpenConfig enum fields that Stratum sends as booleans if k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'}: obj[k] = 'true' if v else 'false' changed += 1 elif k in {'enabled'}: # Keep as boolean for config fields continue elif 'status' in k: obj[k] = 'UP' if v else 'DOWN' changed += 1 else: # Default coercion for other boolean fields obj[k] = 'true' if v else 'false' changed += 1 elif isinstance(obj, list): for item in obj: changed += self._coerce_boolean_enums_to_strings(item) return changed def _filter_stratum_specific_fields(self, obj: Any) -> int: """Remove Stratum-specific fields that are not part of OpenConfig standard.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._filter_stratum_specific_fields(v) else: # Remove Stratum-specific fields that cause validation errors if k in {'flow-programming-exception', 'memory-error-alarm', 'node-info', 'front-panel-port-info'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._filter_stratum_specific_fields(item) return removed def _enrich_missing_types(self, resource_path: str, data: Dict) -> int: """Populate default type identityrefs if missing to enable handlers to produce entries.""" added = 0 Loading Loading @@ -284,8 +354,11 @@ class GnmiSessionHandler: sanitized = json.loads(json.dumps(enriched)) removed = self._sanitize_unknown_identityrefs(sanitized) changed = self._coerce_enum_like_booleans(sanitized) empty_removed = self._sanitize_empty_strings(sanitized) enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) stratum_filtered = self._filter_stratum_specific_fields(sanitized) added2 = self._enrich_missing_types(resource_path, sanitized) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, defaults_added=%d', resource_path, removed, changed, added2) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, empty_strings_removed=%d, enum_booleans_coerced=%d, stratum_fields_filtered=%d, defaults_added=%d', resource_path, removed, changed, empty_removed, enum_coerced, stratum_filtered, added2) try: return parse(resource_path, sanitized, self._yang_handler) except Exception as e2: Loading Loading @@ -446,24 +519,7 @@ class GnmiSessionHandler: get_request = GetRequest() get_request.type = GetRequest.DataType.ALL get_request.encoding = Encoding.Value(self._encoding) if self._encoding else Encoding.JSON_IETF #get_request.use_models.add() # kept empty: return for all models supported def _expand_query_paths(canonical_path: str) -> List[str]: # Remove namespace on first segment for initial try segs = canonical_path.strip('/').split('/') if not segs: return [canonical_path] first = segs[0] if ':' in first: first = first.split(':', 1)[1] base = first # Generic narrow queries to avoid unsupported subtrees if base == 'components': return ['/components/component/state', '/components/component/transceiver/state'] if base == 'interfaces': return ['/interfaces/interface/state', '/interfaces/interface/config'] # Default: non-namespaced canonical return ['/' + '/'.join([first] + segs[1:])] supported_models = self._capabilities.get('supported_models', set()) for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) Loading @@ -472,10 +528,12 @@ class GnmiSessionHandler: self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key))) str_path = get_path(resource_key) self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path))) expanded_paths = _expand_query_paths(str_path) self._logger.debug('Expanded GET paths for %s: %s', str_path, expanded_paths) for p in expanded_paths: get_request.path.append(path_from_string(p)) parent_namespace = str_path.split(':')[0].strip('/') if ':' in str_path else None if parent_namespace is not None and parent_namespace not in supported_models: self._logger.warning('Skipping path %s because model %s is not advertised', str_path, parent_namespace) continue self._logger.info('Processing path %s because model %s is advertised', str_path, parent_namespace) get_request.path.append(path_from_string(str_path)) except Exception as e: # pylint: disable=broad-except MSG = 'Exception parsing {:s}: {:s}' self._logger.exception(MSG.format(str_resource_name, str(resource_key))) Loading Loading @@ -507,29 +565,9 @@ class GnmiSessionHandler: fallback_request.type = GetRequest.DataType.ALL fallback_request.encoding = get_request.encoding # Re-add namespace of canonical path on first segment if present for rk in resource_keys: try: canonical = get_path(rk) segs = canonical.strip('/').split('/') if not segs: continue first = segs[0] ns = first.split(':', 1)[0] if ':' in first else None if not ns: continue base = first.split(':', 1)[1] if ':' in first else first # Rebuild namespaced narrow paths if base == 'components': ns_paths = [f'/{ns}:components/component/state', f'/{ns}:components/component/transceiver/state'] elif base == 'interfaces': ns_paths = [f'/{ns}:interfaces/interface/state', f'/{ns}:interfaces/interface/config'] else: ns_paths = [f'/{ns}:{"/".join([base]+segs[1:])}'] for p in ns_paths: fallback_request.path.append(path_from_string(p)) except Exception: continue for path in get_request.path: path_str = path_to_string(path) fallback_request.path.append(path_from_string(path_str.split(':', 1)[1] if ':' in path_str else path_str)) get_reply = self._stub.Get(fallback_request, metadata=metadata, timeout=timeout) Loading Loading
src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +85 −47 Original line number Diff line number Diff line Loading @@ -235,6 +235,76 @@ class GnmiSessionHandler: changed += self._coerce_enum_like_booleans(item) return changed def _sanitize_empty_strings(self, obj: Any) -> int: """Recursively remove empty string leaves that violate OpenConfig schema constraints.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._sanitize_empty_strings(v) else: # Remove empty strings for fields that should have meaningful values if isinstance(v, str) and v == "": # Keep some fields that can legitimately be empty if k not in {'description', 'name'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._sanitize_empty_strings(item) return removed def _coerce_boolean_enums_to_strings(self, obj: Any) -> int: """Recursively coerce boolean values to appropriate string enums for specific OpenConfig fields.""" changed = 0 if isinstance(obj, dict): for k, v in list(obj.items()): if isinstance(v, dict) or isinstance(v, list): changed += self._coerce_boolean_enums_to_strings(v) else: if isinstance(v, bool): # Handle specific OpenConfig enum fields that Stratum sends as booleans if k in {'loopback-mode', 'auto-negotiate', 'forwarding-viable'}: obj[k] = 'true' if v else 'false' changed += 1 elif k in {'enabled'}: # Keep as boolean for config fields continue elif 'status' in k: obj[k] = 'UP' if v else 'DOWN' changed += 1 else: # Default coercion for other boolean fields obj[k] = 'true' if v else 'false' changed += 1 elif isinstance(obj, list): for item in obj: changed += self._coerce_boolean_enums_to_strings(item) return changed def _filter_stratum_specific_fields(self, obj: Any) -> int: """Remove Stratum-specific fields that are not part of OpenConfig standard.""" removed = 0 if isinstance(obj, dict): keys_to_delete = [] for k, v in obj.items(): if isinstance(v, dict) or isinstance(v, list): removed += self._filter_stratum_specific_fields(v) else: # Remove Stratum-specific fields that cause validation errors if k in {'flow-programming-exception', 'memory-error-alarm', 'node-info', 'front-panel-port-info'}: keys_to_delete.append(k) for k in keys_to_delete: del obj[k] removed += 1 elif isinstance(obj, list): for item in obj: removed += self._filter_stratum_specific_fields(item) return removed def _enrich_missing_types(self, resource_path: str, data: Dict) -> int: """Populate default type identityrefs if missing to enable handlers to produce entries.""" added = 0 Loading Loading @@ -284,8 +354,11 @@ class GnmiSessionHandler: sanitized = json.loads(json.dumps(enriched)) removed = self._sanitize_unknown_identityrefs(sanitized) changed = self._coerce_enum_like_booleans(sanitized) empty_removed = self._sanitize_empty_strings(sanitized) enum_coerced = self._coerce_boolean_enums_to_strings(sanitized) stratum_filtered = self._filter_stratum_specific_fields(sanitized) added2 = self._enrich_missing_types(resource_path, sanitized) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, defaults_added=%d', resource_path, removed, changed, added2) self._logger.info('Sanitization changes for %s: removed_unknown_identityrefs=%d, coerced_booleans=%d, empty_strings_removed=%d, enum_booleans_coerced=%d, stratum_fields_filtered=%d, defaults_added=%d', resource_path, removed, changed, empty_removed, enum_coerced, stratum_filtered, added2) try: return parse(resource_path, sanitized, self._yang_handler) except Exception as e2: Loading Loading @@ -446,24 +519,7 @@ class GnmiSessionHandler: get_request = GetRequest() get_request.type = GetRequest.DataType.ALL get_request.encoding = Encoding.Value(self._encoding) if self._encoding else Encoding.JSON_IETF #get_request.use_models.add() # kept empty: return for all models supported def _expand_query_paths(canonical_path: str) -> List[str]: # Remove namespace on first segment for initial try segs = canonical_path.strip('/').split('/') if not segs: return [canonical_path] first = segs[0] if ':' in first: first = first.split(':', 1)[1] base = first # Generic narrow queries to avoid unsupported subtrees if base == 'components': return ['/components/component/state', '/components/component/transceiver/state'] if base == 'interfaces': return ['/interfaces/interface/state', '/interfaces/interface/config'] # Default: non-namespaced canonical return ['/' + '/'.join([first] + segs[1:])] supported_models = self._capabilities.get('supported_models', set()) for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) Loading @@ -472,10 +528,12 @@ class GnmiSessionHandler: self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key))) str_path = get_path(resource_key) self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path))) expanded_paths = _expand_query_paths(str_path) self._logger.debug('Expanded GET paths for %s: %s', str_path, expanded_paths) for p in expanded_paths: get_request.path.append(path_from_string(p)) parent_namespace = str_path.split(':')[0].strip('/') if ':' in str_path else None if parent_namespace is not None and parent_namespace not in supported_models: self._logger.warning('Skipping path %s because model %s is not advertised', str_path, parent_namespace) continue self._logger.info('Processing path %s because model %s is advertised', str_path, parent_namespace) get_request.path.append(path_from_string(str_path)) except Exception as e: # pylint: disable=broad-except MSG = 'Exception parsing {:s}: {:s}' self._logger.exception(MSG.format(str_resource_name, str(resource_key))) Loading Loading @@ -507,29 +565,9 @@ class GnmiSessionHandler: fallback_request.type = GetRequest.DataType.ALL fallback_request.encoding = get_request.encoding # Re-add namespace of canonical path on first segment if present for rk in resource_keys: try: canonical = get_path(rk) segs = canonical.strip('/').split('/') if not segs: continue first = segs[0] ns = first.split(':', 1)[0] if ':' in first else None if not ns: continue base = first.split(':', 1)[1] if ':' in first else first # Rebuild namespaced narrow paths if base == 'components': ns_paths = [f'/{ns}:components/component/state', f'/{ns}:components/component/transceiver/state'] elif base == 'interfaces': ns_paths = [f'/{ns}:interfaces/interface/state', f'/{ns}:interfaces/interface/config'] else: ns_paths = [f'/{ns}:{"/".join([base]+segs[1:])}'] for p in ns_paths: fallback_request.path.append(path_from_string(p)) except Exception: continue for path in get_request.path: path_str = path_to_string(path) fallback_request.path.append(path_from_string(path_str.split(':', 1)[1] if ':' in path_str else path_str)) get_reply = self._stub.Get(fallback_request, metadata=metadata, timeout=timeout) Loading