-
Notifications
You must be signed in to change notification settings - Fork 270
feat: Add support for round-robin partitioning in native shuffle #3076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3076 +/- ##
============================================
+ Coverage 56.12% 59.96% +3.83%
- Complexity 976 1419 +443
============================================
Files 119 170 +51
Lines 11743 15735 +3992
Branches 2251 2605 +354
============================================
+ Hits 6591 9435 +2844
- Misses 4012 4982 +970
- Partials 1140 1318 +178 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| repartitioned = df.repartition(200) | ||
|
|
||
| # repartition using hash partitioning | ||
| # repartitioned = df.repartition(200, "group_key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you planning to keep it for future when benchmarking hash partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, PR #3080 refactors this to support multiple named benchmarks
|
|
||
| spark = SparkSession.builder \ | ||
| .appName(f"ShuffleBenchmark-{args.mode.upper()}") \ | ||
| .config("spark.comet.explain.format", "verbose") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this here as it is already specified in run_all_benchmarks ?
| - **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together | ||
|
|
||
| The only difference is that Comet's partition assignments will differ from Spark's. This is functionally correct | ||
| but may cause issues in tests that compare exact output with Spark. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the exact SORTED output should be the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I will update this comment.
| | `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b | | ||
| | `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | | ||
| | `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | | ||
| | `spark.comet.native.shuffle.partitioning.roundrobin.enabled` | Whether to enable round robin partitioning for Comet native shuffle. This is disabled by default because Comet's round-robin produces different partition assignments than Spark. Spark sorts rows by their binary UnsafeRow representation before assigning partitions, but Comet uses Arrow format which has a different binary layout. Instead, Comet implements round-robin as hash partitioning on all columns, which achieves the same goals: even distribution, deterministic output (for fault tolerance), and no semantic grouping. This is functionally correct but may cause test failures when comparing results with Spark. | false | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is functionally correct but may cause test failures when comparing results with Spark. - this phrase is confusing and ambiguous IMO, we should prob just mention that without sorting the output row order might be not the same as Spark
| let num_columns_to_hash = if *max_hash_columns == 0 { | ||
| input.num_columns() | ||
| } else { | ||
| (*max_hash_columns).min(input.num_columns()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to treat properly negative values
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
- Handle negative max_hash_columns values in planner (normalize to 0) - Clarify documentation: sorted output is identical to Spark, unsorted row ordering may differ - Update config description to be less confusing about test failures Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Which issue does this PR close?
Closes #3067
Builds on #3077
Rationale for this change
When a native plan has round-robin partitioning, we do columnar-to-row, fall back to Spark for partitioning, then row-to-columnar before writing the shuffle file. This is extremely inefficient.
What changes are included in this PR?
Comet's native shuffle now supports
df.repartition(n)(round-robin partitioning). However, it is disabled by default because it produces different partition assignments than Spark. Spark's round-robin implementation sorts rows by their binary UnsafeRow representation before assigning partitions to ensure deterministic output for fault tolerance. Since Comet uses Arrow format internally (which has a completely different binary layout), we cannot match Spark's exact partition assignments.Instead of true round-robin, Comet implements it as hash partitioning on ALL columns. This achieves the same semantic goals:
To enable, set
spark.comet.native.shuffle.partitioning.roundrobin.enabled=true.How are these changes tested?