Skip to content

Introduce SubPool: per-operator sub-pool for blocking spillable operators#22035

Draft
JanKaul wants to merge 1 commit intoapache:mainfrom
Embucket:sub-memory-pool
Draft

Introduce SubPool: per-operator sub-pool for blocking spillable operators#22035
JanKaul wants to merge 1 commit intoapache:mainfrom
Embucket:sub-memory-pool

Conversation

@JanKaul
Copy link
Copy Markdown
Contributor

@JanKaul JanKaul commented May 6, 2026

Summary

This is an exploratory PR for the discussion in
#22036"FairSpillPool
penalises the active operator in a pipeline of blocking spillable
operators"
.

It introduces SubPool, a MemoryPool implementation that wraps a parent
MemoryPool and aggregates a single operator's reservations into one
outer slot on the parent. Within the sub-pool, FairSpillPool-style
fair-share semantics are applied across the operator's own internal
reservations.

ExternalSorter is wired up as the first user: every SortExec node
lazily builds one sub-pool on first execute() and shares it across all
of its partitions. From the parent pool's point of view there is now one
spillable consumer per SortExec node, not one per partition and not
one per internal reservation (ExternalSorter[N] /
ExternalSorterMerge[N]).

This is the smallest concrete step toward the Velox-style hierarchical
memory pool direction sketched in the issue. It does not yet solve the
core "blocking operator penalised for inactive consumers" problem on its
own — but it does give us the structure (per-operator sub-pools, one
slot per operator at the parent) on which a richer arbitration policy
could be built. See Open questions below.

What's in the change

  • datafusion/execution/src/memory_pool/pool.rs

    • New SubPool type. Holds one outer MemoryReservation
      (can_spill = true) against the parent, plus a local num_spill /
      spillable / unspillable triple.
    • Internal try_grow applies a FairSpillPool-style per-reservation
      cap computed from a dynamic capacity:
      parent.memory_limit() - parent.reserved() + self.outer.size(). The
      + outer.size() term re-includes what we already hold so the
      sub-pool doesn't punish itself twice.
    • Public sub_pool(parent, outer_name) -> Arc<dyn MemoryPool>
      convenience constructor.
    • Falls back to usize::MAX capacity when the parent reports
      Infinite / Unknown.
  • datafusion/execution/src/memory_pool/mod.rs

    • Doc-comment on MemoryPool describing SubPool alongside
      TrackConsumersPool, including a recommendation: use
      GreedyMemoryPool as the parent for plans that chain multiple
      blocking spillable operators back-to-back, so each sub-pool can grow
      into the full parent pool when it is the only one currently using
      memory.
  • datafusion/physical-plan/src/sorts/sort.rs

    • SortExec gains a shared_pool: OnceLock<Arc<dyn MemoryPool>>.
      Lazily initialised on the first execute() call (the only point at
      which the RuntimeEnv is in scope), reused across all partitions
      of that SortExec instance.
    • Clone for SortExec is custom: the clone gets a fresh OnceLock,
      so a cloned SortExec is treated as a separate operator instance
      and gets its own sub-pool registration. Same treatment in
      SortExec::with_fetch.
    • ExternalSorter::new takes the per-SortExec sub-pool by
      parameter and registers ExternalSorter[N] /
      ExternalSorterMerge[N] against it instead of against
      runtime.memory_pool.
    • Process-wide SORT_EXEC_INSTANCE_ID counter names each sub-pool's
      outer consumer (SortExec[#k]:subpool) so
      TrackConsumersPool reports remain disambiguable.

Behaviour change

For a plan with N SortExec nodes against the runtime's pool:

Before After
2 spillable consumers per partition 1 spillable consumer per SortExec node
Parent num_spill grows with partitions and with internal reservations Parent num_spill grows only with operator count
Per-partition reservations compete directly at the parent Per-partition reservations compete inside the sub-pool; the sub-pool competes at the parent

The most interesting effect is when the runtime pool is a
GreedyMemoryPool: each SortExec's sub-pool can grow to the full
parent pool when it is the only one actively reserving, and shrinks
back as siblings start reserving. This matches the "operator currently
doing work gets the headroom it needs" intuition that the original
issue argues for, as long as the parent is greedy. With a
FairSpillPool parent the sub-pool still counts as one consumer there,
so M coexisting sub-pools cap their outer at parent / M — the
sub-pool's only added value in that case is the internal fair-share
across partitions.

Open questions for reviewers

  1. Is per-operator sub-pool the right granularity? The Velox model
    nests further (query → task → pipeline → operator). Should we expose
    sub-pools at the ExecutionPlan level rather than wiring each
    spillable operator individually?
  2. Recommended parent. The doc-comment recommends
    GreedyMemoryPool as the parent. For multi-tenant deployments that
    currently use FairSpillPool as their root, is this acceptable, or
    do we need a hybrid that's fair across queries but greedy within a
    query?
  3. Other spillable operators. Only ExternalSorter is migrated
    here. Hash aggregation, sort-merge join (which has its own
    with_can_spill(false) quirk noted in the issue), repartition, and
    nested-loop join are all candidates. Should those land in this PR
    or as follow-ups?
  4. Naming. SubPool is descriptive but generic. Other options:
    OperatorMemoryPool, AggregatingPool, NestedPool. Preferences?
  5. Capacity formula. The dynamic
    parent.memory_limit() - parent.reserved() + outer.size() works for
    GreedyMemoryPool parents but interacts oddly with FairSpillPool
    parents (the sub-pool sees the full parent capacity even though its
    outer is capped at parent / M). Should MemoryPool grow a
    headroom_for(consumer) method instead?

Test plan

  • cargo test -p datafusion-execution memory_pool
  • cargo test -p datafusion --test core_integration concurrent_sort_unfairness
    — chain-of-SortExec reproducer from the linked issue. The
    bottom-most operator's spill count should drop substantially when
    the parent is GreedyMemoryPool.
  • cargo test -p datafusion-physical-plan sorts::sort — sanity-check
    that single-SortExec behaviour is unchanged when the parent
    pool has only this one operator.
  • Manual check on a multi-SortExec plan with
    TrackConsumersPool(GreedyMemoryPool) as the runtime pool: the
    top-consumers list should now show SortExec[#k]:subpool entries
    rather than per-partition ExternalSorter[N] entries.
  • Confirm error messages from SubPool::try_grow still carry the
    inner consumer name (the inner_context() wrapping) — important
    so failures like "ExternalSorter[3] couldn't grow" don't get
    reported as "SortExec[#0]:subpool couldn't grow".

@github-actions github-actions Bot added core Core DataFusion crate execution Related to the execution crate physical-plan Changes to the physical-plan crate labels May 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate execution Related to the execution crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant