-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Is your feature request related to a problem or challenge?
When executing query plans like:
01)ProjectionExec
02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS LAST], fetch=3
03)----ProjectionExec
04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ...
05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST],
06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1
07)------------DataSourceExec: file_groups=lots
We observed:
- lots of data was being shuffled (in ballista)
- lots of data was being copied in RepartitionExec
- a giant sort took place (OOMing), only to limit to a reasonable amount of data
- the rest of the query ran (well, it didn't actually, since # 3 failed)
Describe the solution you'd like
If the partition key is a subset of the sort key, sort/limit before the re-partition, ensuring the minimum data is shuffled (copied in datafusion), and avoid an OOM/spill on the big sort. This can be done by copying the SortExec below the RepartitionExec.
This should be safe because the hash partition key being a prefix of the sort key guarantees that rows surviving the final TopK also survive the pre-repartition TopK
I think it's possible to even avoid the SortExec after the RepartitionExec, if we do a (local to partition) SPM like thing, but that's possibly limited to Ballista and certainly not part of this PR, just noting for later.
Describe alternatives you've considered
The juice might not be worth the squeeze, but cargo bench was showing:
┌──────────────────────┬─────────┬──────────┬─────────┐
│ Scenario │ Enabled │ Disabled │ Speedup │
├──────────────────────┼─────────┼──────────┼─────────┤
│ Fan-in (32 -> 4) │ 2.03 ms │ 3.93 ms │ 48% │
├──────────────────────┼─────────┼──────────┼─────────┤
│ No fan-in (32 -> 32) │ 2.91 ms │ 4.37 ms │ 33% │
└──────────────────────┴─────────┴──────────┴─────────┘
and
┌─────────┬──────────┬──────────┬─────────┐
│ LIMIT │ Enabled │ Disabled │ Speedup │
├─────────┼──────────┼──────────┼─────────┤
│ 10 │ 2.03 ms │ 3.93 ms │ 48% │
├─────────┼──────────┼──────────┼─────────┤
│ 1,000 │ 3.43 ms │ 6.38 ms │ 46% │
├─────────┼──────────┼──────────┼─────────┤
│ 10,000 │ 15.39 ms │ 16.43 ms │ 6% │
├─────────┼──────────┼──────────┼─────────┤
│ 100,000 │ 85.87 ms │ 97.50 ms │ 12% │
└─────────┴──────────┴──────────┴─────────┘
When translated into a distributed plan these gains would be much more significant, but I thought I'd offer this upstream first, see if there was interest, otherwise just keep it on our fork.
Additional context
Draft PR incoming.