Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 12, 2026

Which issue does this PR close?

Closes #3067

Builds on #3077

Rationale for this change

When a native plan has round-robin partitioning, we do columnar-to-row, fall back to Spark for partitioning, then row-to-columnar before writing the shuffle file. This is extremely inefficient.

What changes are included in this PR?

Comet's native shuffle now supports df.repartition(n) (round-robin partitioning). However, it is disabled by default because it produces different partition assignments than Spark. Spark's round-robin implementation sorts rows by their binary UnsafeRow representation before assigning partitions to ensure deterministic output for fault tolerance. Since Comet uses Arrow format internally (which has a completely different binary layout), we cannot match Spark's exact partition assignments.

Instead of true round-robin, Comet implements it as hash partitioning on ALL columns. This achieves the same semantic goals:

  • Even distribution - rows are distributed evenly across partitions (as long as the hash varies sufficiently - in some cases there could be skew)
  • Deterministic - same input always produces the same partition assignments (important for fault tolerance / task retries)
  • No semantic grouping - unlike hash partitioning on specific columns, this doesn't group related rows together

To enable, set spark.comet.native.shuffle.partitioning.roundrobin.enabled=true.

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 71.42857% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.96%. Comparing base (f09f8af) to head (1202acd).
⚠️ Report is 855 commits behind head on main.

Files with missing lines Patch % Lines
...t/execution/shuffle/CometShuffleExchangeExec.scala 43.75% 7 Missing and 2 partials ⚠️
...t/execution/shuffle/CometNativeShuffleWriter.scala 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3076      +/-   ##
============================================
+ Coverage     56.12%   59.96%   +3.83%     
- Complexity      976     1419     +443     
============================================
  Files           119      170      +51     
  Lines         11743    15735    +3992     
  Branches       2251     2605     +354     
============================================
+ Hits           6591     9435    +2844     
- Misses         4012     4982     +970     
- Partials       1140     1318     +178     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove andygrove changed the title feat: experimental support for round-robin partitioning in native shuffle feat: Add support for round-robin partitioning in native shuffle Jan 13, 2026
@andygrove andygrove marked this pull request as ready for review January 14, 2026 20:33
@andygrove andygrove requested a review from mbutrovich January 15, 2026 05:11
@mbutrovich mbutrovich self-requested a review January 15, 2026 15:31
@andygrove andygrove requested a review from comphead January 15, 2026 18:07
repartitioned = df.repartition(200)

# repartition using hash partitioning
# repartitioned = df.repartition(200, "group_key")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you planning to keep it for future when benchmarking hash partitioning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, PR #3080 refactors this to support multiple named benchmarks


spark = SparkSession.builder \
.appName(f"ShuffleBenchmark-{args.mode.upper()}") \
.config("spark.comet.explain.format", "verbose") \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this here as it is already specified in run_all_benchmarks ?

- **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together

The only difference is that Comet's partition assignments will differ from Spark's. This is functionally correct
but may cause issues in tests that compare exact output with Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exact SORTED output should be the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I will update this comment.

| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b |
| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true |
| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true |
| `spark.comet.native.shuffle.partitioning.roundrobin.enabled` | Whether to enable round robin partitioning for Comet native shuffle. This is disabled by default because Comet's round-robin produces different partition assignments than Spark. Spark sorts rows by their binary UnsafeRow representation before assigning partitions, but Comet uses Arrow format which has a different binary layout. Instead, Comet implements round-robin as hash partitioning on all columns, which achieves the same goals: even distribution, deterministic output (for fault tolerance), and no semantic grouping. This is functionally correct but may cause test failures when comparing results with Spark. | false |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is functionally correct but may cause test failures when comparing results with Spark. - this phrase is confusing and ambiguous IMO, we should prob just mention that without sorting the output row order might be not the same as Spark

let num_columns_to_hash = if *max_hash_columns == 0 {
input.num_columns()
} else {
(*max_hash_columns).min(input.num_columns())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to treat properly negative values

andygrove and others added 3 commits January 16, 2026 13:34
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
- Handle negative max_hash_columns values in planner (normalize to 0)
- Clarify documentation: sorted output is identical to Spark, unsorted row ordering may differ
- Update config description to be less confusing about test failures

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add round-robin partitioning support to native shuffle

4 participants