Introduce SubPool: per-operator sub-pool for blocking spillable operators#22035
Draft
JanKaul wants to merge 1 commit intoapache:mainfrom
Draft
Introduce SubPool: per-operator sub-pool for blocking spillable operators#22035JanKaul wants to merge 1 commit intoapache:mainfrom
SubPool: per-operator sub-pool for blocking spillable operators#22035JanKaul wants to merge 1 commit intoapache:mainfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This is an exploratory PR for the discussion in
#22036 — "
FairSpillPoolpenalises the active operator in a pipeline of blocking spillable
operators".
It introduces
SubPool, aMemoryPoolimplementation that wraps a parentMemoryPooland aggregates a single operator's reservations into oneouter slot on the parent. Within the sub-pool,
FairSpillPool-stylefair-share semantics are applied across the operator's own internal
reservations.
ExternalSorteris wired up as the first user: everySortExecnodelazily builds one sub-pool on first
execute()and shares it across allof its partitions. From the parent pool's point of view there is now one
spillable consumer per
SortExecnode, not one per partition and notone per internal reservation (
ExternalSorter[N]/ExternalSorterMerge[N]).This is the smallest concrete step toward the Velox-style hierarchical
memory pool direction sketched in the issue. It does not yet solve the
core "blocking operator penalised for inactive consumers" problem on its
own — but it does give us the structure (per-operator sub-pools, one
slot per operator at the parent) on which a richer arbitration policy
could be built. See Open questions below.
What's in the change
datafusion/execution/src/memory_pool/pool.rsSubPooltype. Holds one outerMemoryReservation(
can_spill = true) against the parent, plus a localnum_spill/spillable/unspillabletriple.try_growapplies aFairSpillPool-style per-reservationcap computed from a dynamic capacity:
parent.memory_limit() - parent.reserved() + self.outer.size(). The+ outer.size()term re-includes what we already hold so thesub-pool doesn't punish itself twice.
sub_pool(parent, outer_name) -> Arc<dyn MemoryPool>convenience constructor.
usize::MAXcapacity when the parent reportsInfinite/Unknown.datafusion/execution/src/memory_pool/mod.rsMemoryPooldescribingSubPoolalongsideTrackConsumersPool, including a recommendation: useGreedyMemoryPoolas the parent for plans that chain multipleblocking spillable operators back-to-back, so each sub-pool can grow
into the full parent pool when it is the only one currently using
memory.
datafusion/physical-plan/src/sorts/sort.rsSortExecgains ashared_pool: OnceLock<Arc<dyn MemoryPool>>.Lazily initialised on the first
execute()call (the only point atwhich the
RuntimeEnvis in scope), reused across all partitionsof that
SortExecinstance.Clone for SortExecis custom: the clone gets a freshOnceLock,so a cloned
SortExecis treated as a separate operator instanceand gets its own sub-pool registration. Same treatment in
SortExec::with_fetch.ExternalSorter::newtakes the per-SortExecsub-pool byparameter and registers
ExternalSorter[N]/ExternalSorterMerge[N]against it instead of againstruntime.memory_pool.SORT_EXEC_INSTANCE_IDcounter names each sub-pool'souter consumer (
SortExec[#k]:subpool) soTrackConsumersPoolreports remain disambiguable.Behaviour change
For a plan with N
SortExecnodes against the runtime's pool:SortExecnodenum_spillgrows with partitions and with internal reservationsnum_spillgrows only with operator countThe most interesting effect is when the runtime pool is a
GreedyMemoryPool: eachSortExec's sub-pool can grow to the fullparent pool when it is the only one actively reserving, and shrinks
back as siblings start reserving. This matches the "operator currently
doing work gets the headroom it needs" intuition that the original
issue argues for, as long as the parent is greedy. With a
FairSpillPoolparent the sub-pool still counts as one consumer there,so M coexisting sub-pools cap their outer at
parent / M— thesub-pool's only added value in that case is the internal fair-share
across partitions.
Open questions for reviewers
nests further (query → task → pipeline → operator). Should we expose
sub-pools at the
ExecutionPlanlevel rather than wiring eachspillable operator individually?
GreedyMemoryPoolas the parent. For multi-tenant deployments thatcurrently use
FairSpillPoolas their root, is this acceptable, ordo we need a hybrid that's fair across queries but greedy within a
query?
ExternalSorteris migratedhere. Hash aggregation, sort-merge join (which has its own
with_can_spill(false)quirk noted in the issue), repartition, andnested-loop join are all candidates. Should those land in this PR
or as follow-ups?
SubPoolis descriptive but generic. Other options:OperatorMemoryPool,AggregatingPool,NestedPool. Preferences?parent.memory_limit() - parent.reserved() + outer.size()works forGreedyMemoryPoolparents but interacts oddly withFairSpillPoolparents (the sub-pool sees the full parent capacity even though its
outer is capped at
parent / M). ShouldMemoryPoolgrow aheadroom_for(consumer)method instead?Test plan
cargo test -p datafusion-execution memory_poolcargo test -p datafusion --test core_integration concurrent_sort_unfairness— chain-of-
SortExecreproducer from the linked issue. Thebottom-most operator's spill count should drop substantially when
the parent is
GreedyMemoryPool.cargo test -p datafusion-physical-plan sorts::sort— sanity-checkthat single-
SortExecbehaviour is unchanged when the parentpool has only this one operator.
SortExecplan withTrackConsumersPool(GreedyMemoryPool)as the runtime pool: thetop-consumers list should now show
SortExec[#k]:subpoolentriesrather than per-partition
ExternalSorter[N]entries.SubPool::try_growstill carry theinner consumer name (the
inner_context()wrapping) — importantso failures like "ExternalSorter[3] couldn't grow" don't get
reported as "SortExec[#0]:subpool couldn't grow".