Commit 2440159e authored by Pedro Duarte's avatar Pedro Duarte
Browse files

fix on the aggregation logic

parent ed93fdb2
Loading
Loading
Loading
Loading
+28 −32
Original line number Diff line number Diff line
@@ -138,11 +138,15 @@ class GnmiSessionHandler:
        first_path_str = path_to_string(updates[0].path)
        first_path_parts = first_path_str.strip('/').split('/')
        
        self._logger.debug('Finding common path. First path: %s, parts: %s', first_path_str, first_path_parts)

        # Iterate through the rest of the updates to find common segments
        for update in updates[1:]:
            update_path_str = path_to_string(update.path)
            update_path_parts = update_path_str.strip('/').split('/')
            
            self._logger.debug('Comparing with path: %s, parts: %s', update_path_str, update_path_parts)

            # Find the minimum length of the two paths
            min_len = min(len(first_path_parts), len(update_path_parts))

@@ -150,9 +154,12 @@ class GnmiSessionHandler:
            for i in range(min_len):
                if first_path_parts[i] != update_path_parts[i]:
                    # Return the common path up to the point of divergence
                    return '/'.join(first_path_parts[:i])
                    common_path = '/'.join(first_path_parts[:i])
                    self._logger.debug('Found common path: %s (diverged at index %d)', common_path, i)
                    return common_path

        # If all updates have the same path, return the full path
        self._logger.debug('All paths are identical, returning: %s', first_path_str)
        return first_path_str

    def _reconstruct_object_from_updates(self, updates):
@@ -167,9 +174,9 @@ class GnmiSessionHandler:
        # Find the common path among all updates
        common_path = self._find_common_resource_path(updates)
        if not common_path:
            self._logger.warning('Could not find common path for reconstruction. Falling back to individual parsing.')
            # Fallback to individual parsing if common path cannot be determined
            return {path_to_string(update.path): decode_value(update.val) for update in updates}
            self._logger.error('Could not find common path for reconstruction. This should not happen.')
            # Return empty object instead of falling back to individual parsing
            return {}

        # Initialize the reconstructed data with the common path
        reconstructed_data = {}
@@ -190,19 +197,6 @@ class GnmiSessionHandler:

        return reconstructed_data

    def _process_single_update(self, update, results):
        """
        Processes a single update and adds it to the results.
        """
        str_path = path_to_string(update.path)
        try:
            value = decode_value(update.val)
            results.extend(parse(str_path, value, self._yang_handler))
        except Exception as e:
            MSG = 'Exception processing update {:s}'
            self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
            results.append((str_path, e))

    def disconnect(self):
        if not self._connected.is_set(): return
        with self._lock:
@@ -300,30 +294,32 @@ class GnmiSessionHandler:
            #        raise Exception(MSG.format(str(str_path), str(resource_keys)))
            #    resource_key_tuple[2] = True

            # For aggregated notifications, we need to reconstruct the complete object
            # and parse it as a single resource instead of individual fields
            if len(notification.update) > 1:
                # This is an aggregated notification - find the common resource path
            # All notifications should be processed through aggregation
            # Find the common resource path for all updates in this notification
            if notification.update:
                self._logger.debug('Processing notification with %d updates', len(notification.update))
                for i, update in enumerate(notification.update):
                    self._logger.debug('Update %d: %s', i, path_to_string(update.path))
                
                common_path = self._find_common_resource_path(notification.update)
                if common_path:
                    self._logger.debug('Processing aggregated notification for resource: %s', common_path)
                    self._logger.debug('Processing notification for resource: %s with %d updates', 
                                     common_path, len(notification.update))
                    try:
                        # Reconstruct the complete object from all updates
                        reconstructed_data = self._reconstruct_object_from_updates(notification.update)
                        # Parse the reconstructed object
                        self._logger.debug('Reconstructed data keys: %s', list(reconstructed_data.keys()))
                        # Parse the reconstructed object at the resource level
                        results.extend(parse(common_path, reconstructed_data, self._yang_handler))
                    except Exception as e:
                        self._logger.exception('Exception processing aggregated notification for path %s', common_path)
                        self._logger.exception('Exception processing notification for path %s', common_path)
                        results.append((common_path, e))
                else:
                    self._logger.warning('Could not determine common resource path, falling back to individual parsing')
                    # Fall back to individual parsing
                    for update in notification.update:
                        self._process_single_update(update, results)
            else:
                # Single update, process normally
                for update in notification.update:
                    self._process_single_update(update, results)
                    self._logger.error('Could not determine common resource path for notification with %d updates', 
                                     len(notification.update))
                    # If we can't determine the path, log the error but don't try individual parsing
                    # as that would defeat the purpose of aggregation
                    results.append(('unknown', Exception(f'Could not determine common path for {len(notification.update)} updates')))

        #_results = sorted(results.items(), key=lambda x: x[1][0])
        #results = list()