Loading src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +119 −19 Original line number Diff line number Diff line Loading @@ -154,38 +154,138 @@ class GnmiSessionHandler: def _reconstruct_object_from_updates(self, updates): """ Reconstructs a complete object from a list of updates. This is necessary because GNMI might send updates for different fields of the same object in a single notification. Reconstructs a complete object from a list of updates into OpenConfig JSON expected by libyang parse_to_dict when wrapped under the module root. """ if not updates: return {} # Find the common path among all updates common_path = self._find_common_resource_path(updates) if not common_path: self._logger.error('Could not find common path for reconstruction. This should not happen.') # Return empty object instead of falling back to individual parsing return {} # Initialize the reconstructed data with the common path reconstructed_data = {} result: Dict[str, Any] = {} for update in updates: path_str = path_to_string(update.path) # Remove the common path prefix to get the relative path relative_path = path_str[len(common_path):] # Handle cases where the relative path is empty (e.g., if common_path is the full path) if relative_path == '': relative_path = '/' # GNMI paths are always absolute full_path = path_to_string(update.path) try: value = decode_value(update.val) except Exception as e: self._logger.warning('Could not decode value for update %s: %s', full_path, e) value = None # Decode the value and add to the reconstructed data # Compute relative path under the top-level resource (e.g., '/interfaces') if full_path.startswith(common_path): relative_path = full_path[len(common_path):] else: # Should not happen; fallback to full path relative_path = full_path # Normalize leading slash if relative_path.startswith('/'): relative_path = relative_path[1:] # Insert this leaf value into the JSON structure try: reconstructed_data[relative_path] = decode_value(update.val) self._insert_update_into_json(result, relative_path, value) except Exception as e: self._logger.warning('Could not decode value for update %s: %s', path_str, e) reconstructed_data[relative_path] = None # Represent as None if decoding fails self._logger.exception('Failed inserting update into JSON (path=%s, rel=%s)', full_path, relative_path) return reconstructed_data return result def _prefix_augmented_node(self, node_name: str) -> str: """ Add module prefixes for augmented nodes so libyang can validate. """ augment_prefix_by_node = { # Interfaces augments 'ethernet': 'openconfig-if-ethernet', 'ipv4': 'openconfig-if-ip', 'ipv6': 'openconfig-if-ip', 'vlan': 'openconfig-vlan', # Platform augments 'alarms': 'openconfig-platform-healthz', 'integrated-circuit': 'openconfig-platform-integrated-circuit', 'controller-card': 'openconfig-platform-controller-card', 'cpu': 'openconfig-platform-cpu', 'fabric': 'openconfig-platform-fabric', 'fan': 'openconfig-platform-fan', 'linecard': 'openconfig-platform-linecard', 'pipeline-counters': 'openconfig-platform-pipeline-counters', 'port': 'openconfig-platform-port', 'psu': 'openconfig-platform-psu', 'software': 'openconfig-platform-software', 'transceiver': 'openconfig-platform-transceiver', } prefix = augment_prefix_by_node.get(node_name) return f'{prefix}:{node_name}' if prefix else node_name def _insert_update_into_json(self, root: Dict[str, Any], relative_path: str, value: Any) -> None: """ Insert a single update (relative path like 'interface[name=veth0]/config/enabled') into the root dict as OpenConfig JSON. """ if relative_path == '' or relative_path == '/': return segments = [seg for seg in relative_path.split('/') if seg] cursor = root for i, segment in enumerate(segments): is_last = i == len(segments) - 1 # Detect list segment like 'interface[name=veth0]' or 'address[ip=1.2.3.4]' if '[' in segment and segment.endswith(']'): base_name = segment[:segment.index('[')] keys_str = segment[segment.index('[') + 1:-1] # Support multiple keys separated by ']'? ',' key_pairs = [kv for kv in keys_str.split(',') if kv] key_map: Dict[str, Any] = {} for pair in key_pairs: if '=' in pair: k, v = pair.split('=', 1) # Strip quotes if any if len(v) >= 2 and ((v[0] == '"' and v[-1] == '"') or (v[0] == "'" and v[-1] == "'")): v = v[1:-1] # Convert to int if numeric try: v_num = int(v) v = v_num except Exception: pass key_map[k] = v list_name = base_name list_name = self._prefix_augmented_node(list_name) # Ensure list exists as an array if list_name not in cursor or not isinstance(cursor[list_name], list): cursor[list_name] = [] # Find existing element by matching all keys (keys appear as top-level leaves in OC JSON) found = None for elem in cursor[list_name]: if all(elem.get(k) == v for k, v in key_map.items()): found = elem break if found is None: found = {**key_map} cursor[list_name].append(found) # Advance cursor cursor = found continue # Container or leaf node_name = self._prefix_augmented_node(segment) if is_last: cursor[node_name] = value else: if node_name not in cursor or not isinstance(cursor[node_name], dict): cursor[node_name] = {} cursor = cursor[node_name] def disconnect(self): if not self._connected.is_set(): return Loading Loading
src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py +119 −19 Original line number Diff line number Diff line Loading @@ -154,38 +154,138 @@ class GnmiSessionHandler: def _reconstruct_object_from_updates(self, updates): """ Reconstructs a complete object from a list of updates. This is necessary because GNMI might send updates for different fields of the same object in a single notification. Reconstructs a complete object from a list of updates into OpenConfig JSON expected by libyang parse_to_dict when wrapped under the module root. """ if not updates: return {} # Find the common path among all updates common_path = self._find_common_resource_path(updates) if not common_path: self._logger.error('Could not find common path for reconstruction. This should not happen.') # Return empty object instead of falling back to individual parsing return {} # Initialize the reconstructed data with the common path reconstructed_data = {} result: Dict[str, Any] = {} for update in updates: path_str = path_to_string(update.path) # Remove the common path prefix to get the relative path relative_path = path_str[len(common_path):] # Handle cases where the relative path is empty (e.g., if common_path is the full path) if relative_path == '': relative_path = '/' # GNMI paths are always absolute full_path = path_to_string(update.path) try: value = decode_value(update.val) except Exception as e: self._logger.warning('Could not decode value for update %s: %s', full_path, e) value = None # Decode the value and add to the reconstructed data # Compute relative path under the top-level resource (e.g., '/interfaces') if full_path.startswith(common_path): relative_path = full_path[len(common_path):] else: # Should not happen; fallback to full path relative_path = full_path # Normalize leading slash if relative_path.startswith('/'): relative_path = relative_path[1:] # Insert this leaf value into the JSON structure try: reconstructed_data[relative_path] = decode_value(update.val) self._insert_update_into_json(result, relative_path, value) except Exception as e: self._logger.warning('Could not decode value for update %s: %s', path_str, e) reconstructed_data[relative_path] = None # Represent as None if decoding fails self._logger.exception('Failed inserting update into JSON (path=%s, rel=%s)', full_path, relative_path) return reconstructed_data return result def _prefix_augmented_node(self, node_name: str) -> str: """ Add module prefixes for augmented nodes so libyang can validate. """ augment_prefix_by_node = { # Interfaces augments 'ethernet': 'openconfig-if-ethernet', 'ipv4': 'openconfig-if-ip', 'ipv6': 'openconfig-if-ip', 'vlan': 'openconfig-vlan', # Platform augments 'alarms': 'openconfig-platform-healthz', 'integrated-circuit': 'openconfig-platform-integrated-circuit', 'controller-card': 'openconfig-platform-controller-card', 'cpu': 'openconfig-platform-cpu', 'fabric': 'openconfig-platform-fabric', 'fan': 'openconfig-platform-fan', 'linecard': 'openconfig-platform-linecard', 'pipeline-counters': 'openconfig-platform-pipeline-counters', 'port': 'openconfig-platform-port', 'psu': 'openconfig-platform-psu', 'software': 'openconfig-platform-software', 'transceiver': 'openconfig-platform-transceiver', } prefix = augment_prefix_by_node.get(node_name) return f'{prefix}:{node_name}' if prefix else node_name def _insert_update_into_json(self, root: Dict[str, Any], relative_path: str, value: Any) -> None: """ Insert a single update (relative path like 'interface[name=veth0]/config/enabled') into the root dict as OpenConfig JSON. """ if relative_path == '' or relative_path == '/': return segments = [seg for seg in relative_path.split('/') if seg] cursor = root for i, segment in enumerate(segments): is_last = i == len(segments) - 1 # Detect list segment like 'interface[name=veth0]' or 'address[ip=1.2.3.4]' if '[' in segment and segment.endswith(']'): base_name = segment[:segment.index('[')] keys_str = segment[segment.index('[') + 1:-1] # Support multiple keys separated by ']'? ',' key_pairs = [kv for kv in keys_str.split(',') if kv] key_map: Dict[str, Any] = {} for pair in key_pairs: if '=' in pair: k, v = pair.split('=', 1) # Strip quotes if any if len(v) >= 2 and ((v[0] == '"' and v[-1] == '"') or (v[0] == "'" and v[-1] == "'")): v = v[1:-1] # Convert to int if numeric try: v_num = int(v) v = v_num except Exception: pass key_map[k] = v list_name = base_name list_name = self._prefix_augmented_node(list_name) # Ensure list exists as an array if list_name not in cursor or not isinstance(cursor[list_name], list): cursor[list_name] = [] # Find existing element by matching all keys (keys appear as top-level leaves in OC JSON) found = None for elem in cursor[list_name]: if all(elem.get(k) == v for k, v in key_map.items()): found = elem break if found is None: found = {**key_map} cursor[list_name].append(found) # Advance cursor cursor = found continue # Container or leaf node_name = self._prefix_augmented_node(segment) if is_last: cursor[node_name] = value else: if node_name not in cursor or not isinstance(cursor[node_name], dict): cursor[node_name] = {} cursor = cursor[node_name] def disconnect(self): if not self._connected.is_set(): return Loading