Skip to content

Minimize data copy when repartitioning if repartition key is a subset of the sort key #20735

@avantgardnerio

Description

@avantgardnerio

Is your feature request related to a problem or challenge?

When executing query plans like:

01)ProjectionExec
02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS LAST], fetch=3
03)----ProjectionExec
04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ...
05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], 
06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=1
07)------------DataSourceExec: file_groups=lots

We observed:

  1. lots of data was being shuffled (in ballista)
  2. lots of data was being copied in RepartitionExec
  3. a giant sort took place (OOMing), only to limit to a reasonable amount of data
  4. the rest of the query ran (well, it didn't actually, since # 3 failed)

Describe the solution you'd like

If the partition key is a subset of the sort key, sort/limit before the re-partition, ensuring the minimum data is shuffled (copied in datafusion), and avoid an OOM/spill on the big sort. This can be done by copying the SortExec below the RepartitionExec.

This should be safe because the hash partition key being a prefix of the sort key guarantees that rows surviving the final TopK also survive the pre-repartition TopK

I think it's possible to even avoid the SortExec after the RepartitionExec, if we do a (local to partition) SPM like thing, but that's possibly limited to Ballista and certainly not part of this PR, just noting for later.

Describe alternatives you've considered

The juice might not be worth the squeeze, but cargo bench was showing:

┌──────────────────────┬─────────┬──────────┬─────────┐                                                                                                                                 
  │       Scenario       │ Enabled │ Disabled │ Speedup │                                                                                                                                 
  ├──────────────────────┼─────────┼──────────┼─────────┤                                                                                                                                 
  │ Fan-in (32 -> 4)     │ 2.03 ms │ 3.93 ms  │ 48%     │
  ├──────────────────────┼─────────┼──────────┼─────────┤                                                                                                                                 
  │ No fan-in (32 -> 32) │ 2.91 ms │ 4.37 ms  │ 33%     │                                                                                                                               
  └──────────────────────┴─────────┴──────────┴─────────┘

and

┌─────────┬──────────┬──────────┬─────────┐                                                                                                                                             
  │  LIMIT  │ Enabled  │ Disabled │ Speedup │                                                                                                                                             
  ├─────────┼──────────┼──────────┼─────────┤                                                                                                                                             
  │ 10      │ 2.03 ms  │ 3.93 ms  │ 48%     │                                                                                                                                           
  ├─────────┼──────────┼──────────┼─────────┤                                                                                                                                             
  │ 1,000   │ 3.43 ms  │ 6.38 ms  │ 46%     │                                                                                                                                             
  ├─────────┼──────────┼──────────┼─────────┤                                                                                                                                             
  │ 10,000  │ 15.39 ms │ 16.43 ms │ 6%      │                                                                                                                                             
  ├─────────┼──────────┼──────────┼─────────┤                                                                                                                                             
  │ 100,000 │ 85.87 ms │ 97.50 ms │ 12%     │                                                                                                                                             
  └─────────┴──────────┴──────────┴─────────┘

When translated into a distributed plan these gains would be much more significant, but I thought I'd offer this upstream first, see if there was interest, otherwise just keep it on our fork.

Additional context

Draft PR incoming.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions