Skip to content

FairSpillPool penalises the active operator in a pipeline of blocking spillable operators #22036

@JanKaul

Description

@JanKaul

Describe the bug

FairSpillPool divides spillable memory by the number of currently-registered
spillable MemoryConsumers. Combined with two facts about how DataFusion
plans execute, this means the operator actually doing work can be capped far
below its fair share:

  1. SortExec::execute (and other operators that build an ExternalSorter)
    register their MemoryConsumer synchronously during execute(),
    before the returned stream is ever polled
    (sort.rs:1248-1259).
    Parent operators call children's execute() synchronously too. So all
    spillable consumers in a plan typically register before any data flows.
  2. SortExec is a blocking operator: the first poll on its output stream
    runs the entire input-consumption phase before yielding. So in a chain of
    blocking spillable operators, only the bottom-most operator is doing work
    at any given moment, even though every operator above it is already
    registered with the pool.

The result is that the operator currently doing the work is artificially
capped at pool_size / num_spill, where num_spill includes operators
that have not yet started.

Example

Consider a plan with five stacked SortExecs sharing a FairSpillPool sized
so that a single sort fits comfortably at cap == pool_size, but not at
cap == pool_size / 5:

SortExec (top, level 4)
  └── SortExec (level 3)
        └── SortExec (level 2)
              └── SortExec (level 1)
                    └── SortExec (bottom, level 0)
                          └── <source>

When the root stream is polled:

  • All five execute() calls run recursively and synchronously, so all five
    ExternalSorter consumers register up-front. num_spill == 5.
  • The first poll drives the bottom sort's input phase. It is the only
    operator doing real work, but its per-reservation cap is pool_size / 5,
    so it spills repeatedly.
  • As the bottom sort finishes its input phase and unregisters, the next
    level up starts its input phase — now under num_spill == 4, with a
    larger fair share.
  • This continues up the chain. The top sort runs last under num_spill == 1
    and gets the entire pool, never spilling at all.

A representative spill-count distribution from bottom to top looks like:

[3, 3, 2, 2, 0]

Even though every level processes the same data shape against the same pool,
the operator that runs first is forced to spill the most, and the operator
that runs last does not spill at all.

Expected behavior

For operators in a pipeline of blocking spillable operators processing
identically-shaped inputs against the same pool, the spill behaviour should
not depend on execution order. The per-reservation fair share should
reflect operators that are currently competing for memory, not all
operators that have been registered but are still waiting their turn.

Possible directions for discussion

(Not a proposal — just to start the conversation.)

One worth-considering reference point is Velox's hierarchical memory pool
structure. Instead of a single flat pool with a counter of registered
consumers, Velox organises memory pools as a tree: a root pool per query,
with child pools per task / pipeline / operator. Each pool tracks its own
reservation and propagates usage up to its parent, and memory arbitration
is performed by walking the tree rather than by dividing a global capacity
by a global consumer count.

I've created a PR that explores the solution with a SubPool for blocking operators. This shows how a MemoryPool hierarchy can be used to allow for more complex scenarios than the current MemoryPools.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions