Skip to content

[RFC]feat: post-construction claim() for hash aggregation memory tracking#3

Draft
wirybeaver wants to merge 1 commit into
mainfrom
spill-arrow-claim
Draft

[RFC]feat: post-construction claim() for hash aggregation memory tracking#3
wirybeaver wants to merge 1 commit into
mainfrom
spill-arrow-claim

Conversation

@wirybeaver

@wirybeaver wirybeaver commented Jun 11, 2026

Copy link
Copy Markdown
Owner

Rationale for this change

DataFusion operators manually track Arrow buffer memory by computing get_array_memory_size() after allocation and calling try_resize() on a MemoryReservation. This causes two correctness problems:

  1. Under-accounting: Missed try_grow call sites are silent bugs.
  2. Over-accounting: Shared Arc<Buffer> causes get_array_memory_size() to count the same physical buffer multiple times across slices and references.

Arrow's ArrayData::claim(pool) API (arrow-rs apache#8918, merged 2026-03-11, available since arrow-buffer 58.3.0) solves both: claims are idempotent — the same physical buffer claimed twice is counted once. This PR wires claim() into GroupedHashAggregateStream::emit() as a proof-of-concept.

Why claim() is post-construction, not during build

Arrow builders (PrimitiveBuilder, GenericByteBuilder, GenericListBuilder) store values in Vec<T::Native>, not MutableBuffer. The VecMutableBuffer conversion only happens in finish(). Although MutableBuffer has a #[cfg(feature = "pool")] reservation field that tracks memory on reallocate() and truncate(), builders bypass it entirely — all growth goes through Vec::push/extend/reserve.

There is no way to enforce pool limits during builder allocation today. claim() is inherently post-construction: let Arrow allocate freely, then register the output with the pool. A brief burst above pool capacity is possible between allocation and claim, but this is acceptable for accounting correctness. A companion upstream proposal (spill-3-arrow-builder-with-pool.md) introduces Builder::with_pool() for limit-enforcement during allocation, but that is future work.

Comparison

Manual try_grow Post-construction claim() Builder with_pool()
Can miss sites Yes No (claim at output boundary) No (tracked at alloc)
Over-counts shared buffers Yes No (idempotent) No (idempotent)
Enforces limits before alloc Yes (laggy) No (brief burst possible) Yes (immediate)
Exists in Arrow today N/A Yes (arrow-buffer 58.3) No (needs arrow-rs PR)

What changes are included in this PR?

Before (manual tracking — current code)

struct GroupedHashAggregateStream {
    reservation: MemoryReservation,
    transient_reservation: MemoryReservation,
    // ...
}

fn update_memory_reservation(&mut self) -> Result<()> {
    let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
    let new_size = acc
        + self.group_values.size()
        + self.group_ordering.size()
        + self.current_group_indices.allocated_size()
        + sort_headroom;
    self.reservation.try_resize(new_size)?;  // error-prone, can miss sites
    Ok(())
}

fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
    let batch = RecordBatch::try_new(schema, output)?;
    let batch_size = batch.get_array_memory_size();  // over-counts shared buffers
    self.transient_reservation.grow(batch_size);
    Ok(Some(batch))
}

After (post-construction claim())

struct GroupedHashAggregateStream {
    arrow_pool: Arc<ArrowMemoryPool>,  // wraps the same MemoryPool, one consumer
    reservation: MemoryReservation,    // only for non-Arrow state (Rust Vecs)
    // transient_reservation: REMOVED
}

fn update_memory_reservation(&mut self) -> Result<()> {
    let non_arrow = self.accumulators.iter().map(|x| x.vec_size()).sum::<usize>()
        + self.current_group_indices.allocated_size();
    self.reservation.try_resize(non_arrow)?;
    Ok(())
}

fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
    let batch = RecordBatch::try_new(schema, output)?;
    claim_batch(&batch, &self.arrow_pool);  // shared buffers counted once, idempotent
    Ok(Some(batch))
}

Key design: from_reservation avoids breaking FairSpillPool fairness

The naive wiring of ArrowMemoryPool — creating one can_spill=false sub-consumer per buffer on every reserve() call — breaks FairSpillPool's fairness formula:

fair_share = (pool_size - unspillable) / num_spill

Each can_spill=false registration increments unspillable, shrinking every spillable consumer's budget. With pool_size=4200 and 4 GHA partitions (num_spill=4), the initial fair share is 1050 bytes. After several group_ordering early emits across partitions, sub-consumers accumulate and unspillable grows:

// After early emits: unspillable has grown from 0 to 1200
fair_share = (4200 - 1200) / 4 = 750

// Spill sort needs 809 bytes of headroom:
try_grow(reservation, 809) → ResourcesExhausted: 809 > 750  ✗

Spill itself becomes impossible — and increasing max_memory doesn't help, because unspillable grows proportionally with data.

ArrowMemoryPool::from_reservation() fixes this by calling MemoryReservation::new_empty(), which creates a sibling reservation sharing the same Arc<MemoryRegistration> (same pool consumer, independent size tracking). No new consumer is registered, so num_spill and unspillable are unaffected:

// With from_reservation: unspillable stays at 0
fair_share = (4200 - 0) / 4 = 1050

// Spill sort succeeds:
try_grow(reservation, 809) → ok: 809 ≤ 1050

Wires Arrow's `ArrayData::claim(pool)` into `GroupedHashAggregateStream::emit()`
as a proof-of-concept for accurate output-batch memory accounting.

Key changes:
- `ArrowMemoryPool`: add `from_reservation()` constructor that creates a
  zero-size sibling via `MemoryReservation::new_empty()`, sharing the same
  `Arc<MemoryRegistration>` (same pool consumer, no new `num_spill` entry, no
  change to `unspillable`). All buffer claims aggregate into one shared
  reservation instead of spawning a new `can_spill=false` sub-consumer per
  buffer, which previously inflated `FairSpillPool::unspillable` and shrunk
  every spillable consumer's fair share.
- `SharedClaimHandle`: per-buffer handle backed by `Arc<MemoryReservation>`;
  no external Mutex needed since `MemoryReservation` uses `AtomicUsize`
  internally. Drops release their bytes from the shared reservation.
- `GroupedHashAggregateStream`: wires `from_reservation` to share the
  operator's existing reservation, then calls `claim_batch` in `emit()` for
  non-spill output batches.
@wirybeaver wirybeaver marked this pull request as draft June 11, 2026 03:41
@wirybeaver wirybeaver changed the title feat: post-construction claim() for hash aggregation memory tracking [RFC]feat: post-construction claim() for hash aggregation memory tracking Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant