-
Notifications
You must be signed in to change notification settings - Fork 309
feat: Add support for round-robin partitioning in native shuffle #3076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
64546cb
2255474
c018c84
d09650d
35c7ed6
8cfe107
808d083
ad99745
6359d7f
704cb9b
517f36f
6734513
3c87ebc
1fb3cc6
9955943
d5cd1af
b35e402
b40caa6
de575ac
ec1e7b1
cd6c5b8
4d5af6c
3f72eeb
d886068
1202acd
7990528
f63118b
bb6e3c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,9 +41,13 @@ def run_benchmark(spark: SparkSession, data_path: str, mode: str) -> int: | |
|
|
||
| start_time = time.time() | ||
|
|
||
| # Repartition by a different key to force full shuffle of all columns | ||
| # This shuffles all 50 columns including nested structs, arrays, maps | ||
| repartitioned = df.repartition(200, "group_key") | ||
| # Repartition to force full shuffle of all columns | ||
|
|
||
| # repartition using round-robin partitioning | ||
| repartitioned = df.repartition(200) | ||
|
|
||
| # repartition using hash partitioning | ||
| # repartitioned = df.repartition(200, "group_key") | ||
|
|
||
| # Write to parquet to force materialization | ||
| output_path = f"/tmp/shuffle-benchmark-output-{mode}" | ||
|
|
@@ -74,6 +78,9 @@ def main(): | |
|
|
||
| spark = SparkSession.builder \ | ||
| .appName(f"ShuffleBenchmark-{args.mode.upper()}") \ | ||
| .config("spark.comet.explain.format", "verbose") \ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this here as it is already specified in |
||
| .config("spark.comet.explainFallback.enabled", "true") \ | ||
| .config("spark.comet.logFallbackReasons.enabled", "true") \ | ||
| .getOrCreate() | ||
|
|
||
| print("\n" + "=" * 80) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -358,6 +358,34 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED: ConfigEntry[Boolean] = | ||
| conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled") | ||
| .category(CATEGORY_SHUFFLE) | ||
| .doc( | ||
| "Whether to enable round robin partitioning for Comet native shuffle. " + | ||
| "This is disabled by default because Comet's round-robin produces different " + | ||
| "partition assignments than Spark. Spark sorts rows by their binary UnsafeRow " + | ||
| "representation before assigning partitions, but Comet uses Arrow format which " + | ||
| "has a different binary layout. Instead, Comet implements round-robin as hash " + | ||
| "partitioning on all columns, which achieves the same goals: even distribution, " + | ||
| "deterministic output (for fault tolerance), and no semantic grouping. " + | ||
| "This is functionally correct but may cause test failures when comparing " + | ||
| "results with Spark.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS: ConfigEntry[Int] = | ||
| conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns") | ||
| .category(CATEGORY_SHUFFLE) | ||
| .doc( | ||
| "The maximum number of columns to hash for round robin partitioning. " + | ||
| "When set to 0 (the default), all columns are hashed. " + | ||
| "When set to a positive value, only the first N columns are used for hashing, " + | ||
| "which can improve performance for wide tables while still providing " + | ||
| "reasonable distribution.") | ||
| .intConf | ||
| .createWithDefault(0) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add checkValue:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
|
||
| val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = | ||
| conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") | ||
| .category(CATEGORY_SHUFFLE) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,31 @@ this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. | |
| Comet's support for window functions is incomplete and known to be incorrect. It is disabled by default and | ||
| should not be used in production. The feature will be enabled in a future release. Tracking issue: [#2721](https://github.com/apache/datafusion-comet/issues/2721). | ||
|
|
||
| ## Round-Robin Partitioning | ||
|
|
||
| Comet's native shuffle implementation of round-robin partitioning (`df.repartition(n)`) is not compatible with | ||
| Spark's implementation and is disabled by default. It can be enabled by setting | ||
| `spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`. | ||
|
|
||
| **Why the incompatibility exists:** | ||
|
|
||
| Spark's round-robin partitioning sorts rows by their binary `UnsafeRow` representation before assigning them to | ||
| partitions. This ensures deterministic output for fault tolerance (task retries produce identical results). | ||
| Comet uses Arrow format internally, which has a completely different binary layout than `UnsafeRow`, making it | ||
| impossible to match Spark's exact partition assignments. | ||
|
|
||
| **Comet's approach:** | ||
|
|
||
| Instead of true round-robin assignment, Comet implements round-robin as hash partitioning on ALL columns. This | ||
| achieves the same semantic goals: | ||
|
|
||
| - **Even distribution**: Rows are distributed evenly across partitions | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we ensure this? Are row hashes always uniformly distributed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This documentation needs updating. In some cases, there would be good distribution, but this is not guaranteed.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated this |
||
| - **Deterministic**: Same input always produces the same partition assignments (important for fault tolerance) | ||
| - **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together | ||
|
|
||
| The only difference is that Comet's partition assignments will differ from Spark's. This is functionally correct | ||
| but may cause issues in tests that compare exact output with Spark. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the exact SORTED output should be the same?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct. I will update this comment. |
||
|
|
||
| ## Cast | ||
|
|
||
| Cast operations in Comet fall into three levels of support: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -110,6 +110,8 @@ These settings can be used to determine which parts of the plan are accelerated | |
| | `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b | | ||
| | `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | | ||
| | `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | | ||
| | `spark.comet.native.shuffle.partitioning.roundrobin.enabled` | Whether to enable round robin partitioning for Comet native shuffle. This is disabled by default because Comet's round-robin produces different partition assignments than Spark. Spark sorts rows by their binary UnsafeRow representation before assigning partitions, but Comet uses Arrow format which has a different binary layout. Instead, Comet implements round-robin as hash partitioning on all columns, which achieves the same goals: even distribution, deterministic output (for fault tolerance), and no semantic grouping. This is functionally correct but may cause test failures when comparing results with Spark. | false | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is functionally correct but may cause test failures when comparing results with Spark. - this phrase is confusing and ambiguous IMO, we should prob just mention that without sorting the output row order might be not the same as Spark |
||
| | `spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns` | The maximum number of columns to hash for round robin partitioning. When set to 0 (the default), all columns are hashed. When set to a positive value, only the first N columns are used for hashing, which can improve performance for wide tables while still providing reasonable distribution. | 0 | | ||
| | `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | | ||
| | `spark.comet.shuffle.sizeInBytesMultiplier` | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 | | ||
| <!-- prettier-ignore-end --> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you planning to keep it for future when benchmarking hash partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, PR #3080 refactors this to support multiple named benchmarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the benchmark changes from this PR and will handle them in #3080