[RFC]feat: post-construction claim() for hash aggregation memory tracking#3
Draft
wirybeaver wants to merge 1 commit into
Draft
[RFC]feat: post-construction claim() for hash aggregation memory tracking#3wirybeaver wants to merge 1 commit into
wirybeaver wants to merge 1 commit into
Conversation
Wires Arrow's `ArrayData::claim(pool)` into `GroupedHashAggregateStream::emit()` as a proof-of-concept for accurate output-batch memory accounting. Key changes: - `ArrowMemoryPool`: add `from_reservation()` constructor that creates a zero-size sibling via `MemoryReservation::new_empty()`, sharing the same `Arc<MemoryRegistration>` (same pool consumer, no new `num_spill` entry, no change to `unspillable`). All buffer claims aggregate into one shared reservation instead of spawning a new `can_spill=false` sub-consumer per buffer, which previously inflated `FairSpillPool::unspillable` and shrunk every spillable consumer's fair share. - `SharedClaimHandle`: per-buffer handle backed by `Arc<MemoryReservation>`; no external Mutex needed since `MemoryReservation` uses `AtomicUsize` internally. Drops release their bytes from the shared reservation. - `GroupedHashAggregateStream`: wires `from_reservation` to share the operator's existing reservation, then calls `claim_batch` in `emit()` for non-spill output batches.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
DataFusion operators manually track Arrow buffer memory by computing
get_array_memory_size()after allocation and callingtry_resize()on aMemoryReservation. This causes two correctness problems:try_growcall sites are silent bugs.Arc<Buffer>causesget_array_memory_size()to count the same physical buffer multiple times across slices and references.Arrow's
ArrayData::claim(pool)API (arrow-rs apache#8918, merged 2026-03-11, available sincearrow-buffer 58.3.0) solves both: claims are idempotent — the same physical buffer claimed twice is counted once. This PR wiresclaim()intoGroupedHashAggregateStream::emit()as a proof-of-concept.Why
claim()is post-construction, not during buildArrow builders (
PrimitiveBuilder,GenericByteBuilder,GenericListBuilder) store values inVec<T::Native>, notMutableBuffer. TheVec→MutableBufferconversion only happens infinish(). AlthoughMutableBufferhas a#[cfg(feature = "pool")] reservationfield that tracks memory onreallocate()andtruncate(), builders bypass it entirely — all growth goes throughVec::push/extend/reserve.There is no way to enforce pool limits during builder allocation today.
claim()is inherently post-construction: let Arrow allocate freely, then register the output with the pool. A brief burst above pool capacity is possible between allocation and claim, but this is acceptable for accounting correctness. A companion upstream proposal (spill-3-arrow-builder-with-pool.md) introducesBuilder::with_pool()for limit-enforcement during allocation, but that is future work.Comparison
try_growclaim()with_pool()What changes are included in this PR?
Before (manual tracking — current code)
After (post-construction
claim())Key design:
from_reservationavoids breakingFairSpillPoolfairnessThe naive wiring of
ArrowMemoryPool— creating onecan_spill=falsesub-consumer per buffer on everyreserve()call — breaksFairSpillPool's fairness formula:Each
can_spill=falseregistration incrementsunspillable, shrinking every spillable consumer's budget. Withpool_size=4200and 4 GHA partitions (num_spill=4), the initial fair share is 1050 bytes. After severalgroup_orderingearly emits across partitions, sub-consumers accumulate andunspillablegrows:Spill itself becomes impossible — and increasing
max_memorydoesn't help, becauseunspillablegrows proportionally with data.ArrowMemoryPool::from_reservation()fixes this by callingMemoryReservation::new_empty(), which creates a sibling reservation sharing the sameArc<MemoryRegistration>(same pool consumer, independent size tracking). No new consumer is registered, sonum_spillandunspillableare unaffected: