Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/pyspark/run_all_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.shuffle.mode=jvm \
--conf spark.comet.exec.shuffle.mode=jvm \
Expand All @@ -99,10 +98,12 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=native \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.comet.native.shuffle.partitioning.roundrobin.enabled=true \
--conf spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns=0 \
--conf spark.comet.parquet.write.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.cast.allowIncompatible=true \
Expand Down
13 changes: 10 additions & 3 deletions benchmarks/pyspark/run_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ def run_benchmark(spark: SparkSession, data_path: str, mode: str) -> int:

start_time = time.time()

# Repartition by a different key to force full shuffle of all columns
# This shuffles all 50 columns including nested structs, arrays, maps
repartitioned = df.repartition(200, "group_key")
# Repartition to force full shuffle of all columns

# repartition using round-robin partitioning
repartitioned = df.repartition(200)

# repartition using hash partitioning
# repartitioned = df.repartition(200, "group_key")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you planning to keep it for future when benchmarking hash partitioning?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, PR #3080 refactors this to support multiple named benchmarks

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the benchmark changes from this PR and will handle them in #3080


# Write to parquet to force materialization
output_path = f"/tmp/shuffle-benchmark-output-{mode}"
Expand Down Expand Up @@ -74,6 +78,9 @@ def main():

spark = SparkSession.builder \
.appName(f"ShuffleBenchmark-{args.mode.upper()}") \
.config("spark.comet.explain.format", "verbose") \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this here as it is already specified in run_all_benchmarks ?

.config("spark.comet.explainFallback.enabled", "true") \
.config("spark.comet.logFallbackReasons.enabled", "true") \
.getOrCreate()

print("\n" + "=" * 80)
Expand Down
28 changes: 28 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,34 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled")
.category(CATEGORY_SHUFFLE)
.doc(
"Whether to enable round robin partitioning for Comet native shuffle. " +
"This is disabled by default because Comet's round-robin produces different " +
"partition assignments than Spark. Spark sorts rows by their binary UnsafeRow " +
"representation before assigning partitions, but Comet uses Arrow format which " +
"has a different binary layout. Instead, Comet implements round-robin as hash " +
"partitioning on all columns, which achieves the same goals: even distribution, " +
"deterministic output (for fault tolerance), and no semantic grouping. " +
"This is functionally correct but may cause test failures when comparing " +
"results with Spark.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS: ConfigEntry[Int] =
conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns")
.category(CATEGORY_SHUFFLE)
.doc(
"The maximum number of columns to hash for round robin partitioning. " +
"When set to 0 (the default), all columns are hashed. " +
"When set to a positive value, only the first N columns are used for hashing, " +
"which can improve performance for wide tables while still providing " +
"reasonable distribution.")
.intConf
.createWithDefault(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add checkValue:

.checkValue(v => v >= 0, "The maximum number of columns to hash for round robin partitioning must be non-negative.")

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
Expand Down
8 changes: 3 additions & 5 deletions docs/source/contributor-guide/jvm_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
(not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input
from Comet operators.

3. **Unsupported partitioning type**: Native shuffle only supports `HashPartitioning`, `RangePartitioning`,
and `SinglePartition`. JVM shuffle additionally supports `RoundRobinPartitioning`.

4. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
3. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used
as partition keys in native shuffle, though they are fully supported as data columns in both implementations.
as partition keys in native shuffle and will fall back to JVM shuffle. Note that complex types are
Comment thread
andygrove marked this conversation as resolved.
Outdated
fully supported as data columns in both implementations.

## Input Handling

Expand Down
21 changes: 16 additions & 5 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
- `HashPartitioning`
- `RangePartitioning`
- `SinglePartition`

`RoundRobinPartitioning` requires JVM shuffle.
- `RoundRobinPartitioning`

4. **Supported partition key types**: For `HashPartitioning` and `RangePartitioning`, partition
keys must be primitive types. Complex types (struct, array, map) as partition keys require
Expand Down Expand Up @@ -131,7 +130,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range partitioning
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)

4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
Expand Down Expand Up @@ -187,6 +186,18 @@ For range partitioning:
The simplest case: all rows go to partition 0. Uses `SinglePartitionShufflePartitioner` which
simply concatenates batches to reach the configured batch size.

### Round Robin Partitioning

Round robin partitioning distributes rows evenly across partitions in a deterministic way:

1. Computes a Murmur3 hash of **all columns** in each row (using seed 42)
2. Sorts rows by their hash values to ensure deterministic ordering
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
3. Assigns rows to partitions sequentially: `partition_id = sorted_index % num_partitions`

This approach ensures that repeated execution of the same query produces identical results,
which is critical for fault tolerance and retry logic. Unlike Spark's round robin implementation
which uses random seeding, Comet's hash-based approach guarantees determinism across retries.

## Memory Management

Native shuffle uses DataFusion's memory management with spilling support:
Expand Down Expand Up @@ -235,8 +246,8 @@ independently compressed, allowing parallel decompression during reads.
| ------------------- | -------------------------------------- | --------------------------------- |
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's partitioner |
| Supported schemes | Hash, Range, Single | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only | Any type |
| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only (Hash, Range) | Any type |
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash) and sort-based |

Expand Down
25 changes: 25 additions & 0 deletions docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,31 @@ this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
Comet's support for window functions is incomplete and known to be incorrect. It is disabled by default and
should not be used in production. The feature will be enabled in a future release. Tracking issue: [#2721](https://github.com/apache/datafusion-comet/issues/2721).

## Round-Robin Partitioning

Comet's native shuffle implementation of round-robin partitioning (`df.repartition(n)`) is not compatible with
Spark's implementation and is disabled by default. It can be enabled by setting
`spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`.

**Why the incompatibility exists:**

Spark's round-robin partitioning sorts rows by their binary `UnsafeRow` representation before assigning them to
partitions. This ensures deterministic output for fault tolerance (task retries produce identical results).
Comet uses Arrow format internally, which has a completely different binary layout than `UnsafeRow`, making it
impossible to match Spark's exact partition assignments.

**Comet's approach:**

Instead of true round-robin assignment, Comet implements round-robin as hash partitioning on ALL columns. This
achieves the same semantic goals:

- **Even distribution**: Rows are distributed evenly across partitions
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure this? Are row hashes always uniformly distributed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation needs updating. In some cases, there would be good distribution, but this is not guaranteed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this

- **Deterministic**: Same input always produces the same partition assignments (important for fault tolerance)
- **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together

The only difference is that Comet's partition assignments will differ from Spark's. This is functionally correct
but may cause issues in tests that compare exact output with Spark.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exact SORTED output should be the same?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I will update this comment.


## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b |
| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true |
| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true |
| `spark.comet.native.shuffle.partitioning.roundrobin.enabled` | Whether to enable round robin partitioning for Comet native shuffle. This is disabled by default because Comet's round-robin produces different partition assignments than Spark. Spark sorts rows by their binary UnsafeRow representation before assigning partitions, but Comet uses Arrow format which has a different binary layout. Instead, Comet implements round-robin as hash partitioning on all columns, which achieves the same goals: even distribution, deterministic output (for fault tolerance), and no semantic grouping. This is functionally correct but may cause test failures when comparing results with Spark. | false |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is functionally correct but may cause test failures when comparing results with Spark. - this phrase is confusing and ambiguous IMO, we should prob just mention that without sorting the output row order might be not the same as Spark

| `spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns` | The maximum number of columns to hash for round robin partitioning. When set to 0 (the default), all columns are hashed. When set to a positive value, only the first N columns are used for hashing, which can improve performance for wide tables while still providing reasonable distribution. | 0 |
| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| `spark.comet.shuffle.sizeInBytesMultiplier` | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 |
<!-- prettier-ignore-end -->
Expand Down
6 changes: 6 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2356,6 +2356,12 @@ impl PhysicalPlanner {
))
}
PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition),
PartitioningStruct::RoundRobinPartition(rr_partition) => {
Ok(CometPartitioning::RoundRobin(
rr_partition.num_partitions as usize,
rr_partition.max_hash_columns as usize,
))
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/shuffle/comet_partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ pub enum CometPartitioning {
/// Rows for comparing to 4) OwnedRows that represent the boundaries of each partition, used with
/// LexOrdering to bin each value in the RecordBatch to a partition.
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
/// Round robin partitioning. Distributes rows across partitions by sorting them by hash
/// (computed from columns) and then assigning partitions sequentially. Args are:
/// 1) number of partitions, 2) max columns to hash (0 means no limit).
RoundRobin(usize, usize),
}

impl CometPartitioning {
pub fn partition_count(&self) -> usize {
use CometPartitioning::*;
match self {
SinglePartition => 1,
Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
Hash(_, n) | RangePartitioning(_, n, _, _) | RoundRobin(n, _) => *n,
}
}
}
Loading
Loading