Commit 60a1c852 authored by Pedro Duarte's avatar Pedro Duarte
Browse files

add gnmi notification aggregation

parent e2b0f293
Loading
Loading
Loading
Loading
+54 −1
Original line number Diff line number Diff line
@@ -77,6 +77,54 @@ class GnmiSessionHandler:
            self._monit_thread.start()
            self._connected.set()

    def _aggregate_field_responses(self, notifications):
        """
        Aggregate field-by-field responses (like Stratum) into complete object notifications.
        
        Stratum sends one notification per field, but the parsing logic expects
        complete objects. This function groups related fields together at the resource level.
        """
        if not notifications:
            return notifications

        # Group updates by their top-level resource path
        grouped_updates = {}
        
        for notification in notifications:
            for update in notification.update:
                path_str = path_to_string(update.path)
                # Extract top-level resource path (e.g., "/interfaces" from "/interfaces/interface[name=veth0]/config/enabled")
                path_parts = path_str.strip('/').split('/')
                if len(path_parts) >= 1:
                    # Group by the first path segment (e.g., "interfaces", "components", "network-instances")
                    resource_type = path_parts[0]
                    if resource_type not in grouped_updates:
                        grouped_updates[resource_type] = []
                    grouped_updates[resource_type].append(update)
        
        # Create aggregated notifications
        aggregated_notifications = []
        for resource_type, updates in grouped_updates.items():
            if len(updates) > 1:
                # Create a new notification with all updates for this resource type
                aggregated_notification = type(notifications[0])()
                aggregated_notification.CopyFrom(notifications[0])
                # Clear the updates and add our aggregated ones
                aggregated_notification.update.Clear()
                aggregated_notification.update.extend(updates)
                # Use the latest timestamp
                if hasattr(updates[0], 'timestamp') and updates[0].timestamp:
                    aggregated_notification.timestamp = max(u.timestamp for u in updates if u.timestamp)
                aggregated_notifications.append(aggregated_notification)
                self._logger.debug('Aggregated %d updates for resource type: %s', len(updates), resource_type)
            else:
                aggregated_notifications.append(notifications[0]) 
                
        self._logger.debug('Aggregated %d field responses into %d resource-level notifications', 
                         len(notifications), len(aggregated_notifications))

        return aggregated_notifications

    def disconnect(self):
        if not self._connected.is_set(): return
        with self._lock:
@@ -157,7 +205,12 @@ class GnmiSessionHandler:
        results = []
        #results[str_filter] = [i, None, False]  # (index, value, processed?)

        for notification in get_reply.notification:
        aggregated_updates = self._aggregate_field_responses(get_reply.notification)
        
        self._logger.debug('Original notifications: %d, Aggregated notifications: %d', 
                         len(get_reply.notification), len(aggregated_updates))

        for notification in aggregated_updates:
            #for delete_path in notification.delete:
            #    self._logger.info('delete_path={:s}'.format(grpc_message_to_json_string(delete_path)))
            #    str_path = path_to_string(delete_path)