Skip to content

Replace manual memory tracking with Arrow claim()/with_pool() integration #22898

@wirybeaver

Description

@wirybeaver

Is your feature request related to a problem or challenge?

DataFusion's memory accounting relies on operators manually computing get_array_memory_size() after allocation and calling try_resize() on a MemoryReservation. This has two correctness problems:

  1. Under-accounting: Every missed try_grow call site is a silent bug. Operators allocate Arrow buffers (via builders, compute kernels like take()/interleave(), concat_batches()) without reserving memory, causing actual usage to exceed the pool budget and leading to OOM kills.
  2. Over-accounting: Shared Arc<Buffer> ownership causes get_array_memory_size() to count the same physical buffer multiple times when multiple operators or slices hold references. This triggers premature spilling.

These are documented in #20714, #16841, and #22526.

Describe the solution you'd like

I propose two approaches to replace the manual tracking model with automatic, pool-integrated memory accounting. Each approach is independently useful.

Approach 1: Post-construction claim() via ArrowMemoryPool

Use Arrow's buffer.claim(pool) / ArrayData::claim(pool) API to register buffer ownership at output boundaries. Claims are idempotent — the same buffer claimed twice is counted once. This eliminates both under-accounting (can't miss sites if you claim at the output boundary) and over-accounting (shared buffers counted once).

DataFusion already has the adapter: ArrowMemoryPool in datafusion/execution/src/memory_pool/arrow.rs wraps DataFusion's MemoryPool and implements arrow_buffer::MemoryPool. The arrow-buffer/pool feature is enabled.

// BEFORE (manual, error-prone):
fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
    let batch = RecordBatch::try_new(schema, output)?;
    let batch_size = batch.get_array_memory_size();  // over-counts shared buffers
    self.transient_reservation.grow(batch_size);
    Ok(Some(batch))
}

// AFTER (post-construction claim, idempotent):
fn emit(&mut self, ...) -> Result<Option<RecordBatch>> {
    let batch = RecordBatch::try_new(schema, output)?;
    claim_batch(&batch, &self.arrow_pool);  // shared buffers counted once
    Ok(Some(batch))
}

Limitation: claim() is post-construction. A burst allocation can temporarily exceed pool limits before the claim catches it.

Prototype: wirybeaver#3

Approach 2: Builder with_pool() for real-time enforcement

Add with_pool() to Arrow builders so every internal buffer growth goes through the pool's reservation system. If the pool is exhausted, the growth fails before the allocation happens.

Key finding: Arrow builders (PrimitiveBuilder, GenericByteBuilder, GenericListBuilder) store values in Vec<T::Native>, not MutableBuffer. The VecMutableBuffer conversion only happens in finish(). This means MutableBuffer's existing #[cfg(feature = "pool")] reservation field is bypassed during the entire build phase. A TrackedVec<T> wrapper is needed to bridge this gap.

// Real-time enforcement during building:
let mut builder = Int32Builder::new()
    .with_pool(&arrow_pool)?;

for row in rows {
    match builder.try_append_value(row) {
        Ok(()) => {}
        Err(_oom) => {
            // Pool exhausted — spill what we have
            break;
        }
    }
}
let array = builder.finish();
// Reservation transfers into the array's buffer; freed on drop.

Prototype (upstream arrow-rs): wirybeaver/arrow-rs#1

Comparison

Manual try_grow Approach 1: claim() Approach 2: with_pool()
Can miss sites Yes No (claim at output boundary) No (tracked at alloc)
Over-counts shared buffers Yes No (idempotent) No (idempotent)
Enforces limits before alloc Yes (laggy) No (brief burst possible) Yes (immediate)
Exists in Arrow today N/A Yes (arrow-buffer 58.3) No (needs upstream PR)
Change surface per operator ~20 size-compute points ~5 claim points ~0 (automatic)

Describe alternatives you've considered

  • Manual try_grow patches: Fix the under-accounting by adding MemoryReservation tracking at each untracked allocation site. This works but is inherently fragile — every new allocation site must remember to call try_grow, and over-accounting from shared buffers remains unsolved. Prototype: fix: track untracked memory allocations in spill-capable operators wirybeaver/datafusion#1
  • get_slice_memory_size() instead of get_array_memory_size(): Reduces over-counting for sliced arrays but doesn't solve under-accounting and is still manual.
  • compact() retained slices: Eliminates shared buffer references by copying. Correct but adds CPU cost and memory churn.
  • Session-level default pool: All builders pick up a thread-local pool automatically. Eliminates per-builder with_pool() calls but adds implicit global state.

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions