Commit ed93fdb2 authored by Pedro Duarte's avatar Pedro Duarte
Browse files

fix on the aggregation logic

parent 7e61a3a3
Loading
Loading
Loading
Loading
+106 −21
Original line number Diff line number Diff line
@@ -126,6 +126,83 @@ class GnmiSessionHandler:

        return aggregated_notifications

    def _find_common_resource_path(self, updates):
        """
        Finds the common resource path among a list of updates.
        This is a heuristic and might not be accurate for all models.
        """
        if not updates:
            return None

        # Get the path of the first update
        first_path_str = path_to_string(updates[0].path)
        first_path_parts = first_path_str.strip('/').split('/')

        # Iterate through the rest of the updates to find common segments
        for update in updates[1:]:
            update_path_str = path_to_string(update.path)
            update_path_parts = update_path_str.strip('/').split('/')

            # Find the minimum length of the two paths
            min_len = min(len(first_path_parts), len(update_path_parts))

            # Compare segments up to the minimum length
            for i in range(min_len):
                if first_path_parts[i] != update_path_parts[i]:
                    # Return the common path up to the point of divergence
                    return '/'.join(first_path_parts[:i])

        # If all updates have the same path, return the full path
        return first_path_str

    def _reconstruct_object_from_updates(self, updates):
        """
        Reconstructs a complete object from a list of updates.
        This is necessary because GNMI might send updates for different fields
        of the same object in a single notification.
        """
        if not updates:
            return {}

        # Find the common path among all updates
        common_path = self._find_common_resource_path(updates)
        if not common_path:
            self._logger.warning('Could not find common path for reconstruction. Falling back to individual parsing.')
            # Fallback to individual parsing if common path cannot be determined
            return {path_to_string(update.path): decode_value(update.val) for update in updates}

        # Initialize the reconstructed data with the common path
        reconstructed_data = {}
        for update in updates:
            path_str = path_to_string(update.path)
            # Remove the common path prefix to get the relative path
            relative_path = path_str[len(common_path):]
            # Handle cases where the relative path is empty (e.g., if common_path is the full path)
            if relative_path == '':
                relative_path = '/' # GNMI paths are always absolute

            # Decode the value and add to the reconstructed data
            try:
                reconstructed_data[relative_path] = decode_value(update.val)
            except Exception as e:
                self._logger.warning('Could not decode value for update %s: %s', path_str, e)
                reconstructed_data[relative_path] = None # Represent as None if decoding fails

        return reconstructed_data

    def _process_single_update(self, update, results):
        """
        Processes a single update and adds it to the results.
        """
        str_path = path_to_string(update.path)
        try:
            value = decode_value(update.val)
            results.extend(parse(str_path, value, self._yang_handler))
        except Exception as e:
            MSG = 'Exception processing update {:s}'
            self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
            results.append((str_path, e))

    def disconnect(self):
        if not self._connected.is_set(): return
        with self._lock:
@@ -203,14 +280,15 @@ class GnmiSessionHandler:
        
        self._logger.debug('get_reply={:s}'.format(grpc_message_to_json_string(get_reply)))

        results = []
        #results[str_filter] = [i, None, False]  # (index, value, processed?)

        # Aggregate field-by-field responses into complete objects
        aggregated_updates = self._aggregate_field_responses(get_reply.notification)
        
        self._logger.debug('Original notifications: %d, Aggregated notifications: %d', 
                         len(get_reply.notification), len(aggregated_updates))

        results = []
        #results[str_filter] = [i, None, False]  # (index, value, processed?)

        for notification in aggregated_updates:
            #for delete_path in notification.delete:
            #    self._logger.info('delete_path={:s}'.format(grpc_message_to_json_string(delete_path)))
@@ -218,27 +296,34 @@ class GnmiSessionHandler:
            #    resource_key_tuple = results.get(str_path)
            #    if resource_key_tuple is None:
            #        # pylint: disable=broad-exception-raised
            #        MSG = 'Unexpected Delete Path({:s}); requested resource_keys({:s})'
            #        MSG = 'Unexpected Update Path({:s}); requested resource_keys({:s})'
            #        raise Exception(MSG.format(str(str_path), str(resource_keys)))
            #    resource_key_tuple[2] = True

            for update in notification.update:
                self._logger.debug('update={:s}'.format(grpc_message_to_json_string(update)))
                str_path = path_to_string(update.path)
                #resource_key_tuple = results.get(str_path)
                #if resource_key_tuple is None:
                #    # pylint: disable=broad-exception-raised
                #    MSG = 'Unexpected Update Path({:s}); requested resource_keys({:s})'
                #    raise Exception(MSG.format(str(str_path), str(resource_keys)))
            # For aggregated notifications, we need to reconstruct the complete object
            # and parse it as a single resource instead of individual fields
            if len(notification.update) > 1:
                # This is an aggregated notification - find the common resource path
                common_path = self._find_common_resource_path(notification.update)
                if common_path:
                    self._logger.debug('Processing aggregated notification for resource: %s', common_path)
                    try:
                    value = decode_value(update.val)
                    #resource_key_tuple[1] = value
                    #resource_key_tuple[2] = True
                    results.extend(parse(str_path, value, self._yang_handler))
                except Exception as e: # pylint: disable=broad-except
                    MSG = 'Exception processing update {:s}'
                    self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
                    results.append((str_path, e)) # if validation fails, store the exception
                        # Reconstruct the complete object from all updates
                        reconstructed_data = self._reconstruct_object_from_updates(notification.update)
                        # Parse the reconstructed object
                        results.extend(parse(common_path, reconstructed_data, self._yang_handler))
                    except Exception as e:
                        self._logger.exception('Exception processing aggregated notification for path %s', common_path)
                        results.append((common_path, e))
                else:
                    self._logger.warning('Could not determine common resource path, falling back to individual parsing')
                    # Fall back to individual parsing
                    for update in notification.update:
                        self._process_single_update(update, results)
            else:
                # Single update, process normally
                for update in notification.update:
                    self._process_single_update(update, results)

        #_results = sorted(results.items(), key=lambda x: x[1][0])
        #results = list()