Grace Hash Join (GHJ) is the hash join implementation in Apache DataFusion Comet. When spark.comet.exec.replaceSortMergeJoin is enabled, Comet's RewriteJoin rule converts SortMergeJoinExec to ShuffledHashJoinExec (removing the input sorts), and all ShuffledHashJoinExec operators are then executed natively as GraceHashJoinExec.
GHJ partitions both build and probe sides into N buckets by hashing join keys, then joins each bucket independently. When memory is tight, partitions spill to disk using Arrow IPC format. A fast path skips partitioning entirely when the build side is small enough.
Supports all join types: Inner, Left, Right, Full, LeftSemi, LeftAnti, LeftMark, RightSemi, RightAnti, RightMark.
| Config Key | Type | Default | Description |
|---|---|---|---|
spark.comet.exec.replaceSortMergeJoin |
boolean | false |
Replace SortMergeJoin with ShuffledHashJoin (enables GHJ) |
spark.comet.exec.replaceSortMergeJoin.maxBuildSize |
long | 104857600 |
Max build-side bytes for SMJ replacement. -1 = no limit |
spark.comet.exec.graceHashJoin.numPartitions |
int | 16 |
Number of hash partitions (buckets) |
spark.comet.exec.graceHashJoin.fastPathThreshold |
int | 67108864 |
Per-task fast-path budget in bytes |
The RewriteJoin rule checks maxBuildSize against Spark's logical plan statistics before replacing a SortMergeJoinExec. When both sides are large (e.g., TPC-DS q72's catalog_sales JOIN inventory), sort-merge join's streaming merge on pre-sorted data outperforms hash join's per-task hash table construction. Setting maxBuildSize (e.g., 104857600 for 100 MB) keeps SMJ for these cases.
The configured threshold is the total budget across all concurrent tasks on the executor. The planner divides it by spark.executor.cores so each task's fast-path hash table stays within its fair share. For example, with a 32 GB threshold and 8 cores, each task gets a 4 GB per-task limit.
SortMergeJoinExec
-> RewriteJoin converts to ShuffledHashJoinExec (removes input sorts)
-> CometExecRule wraps as CometHashJoinExec
-> CometHashJoinExec.createExec() creates CometHashJoinExec
-> Serialized to protobuf via JNI
-> PhysicalPlanner (Rust) creates GraceHashJoinExec
GraceHashJoinExec ExecutionPlan implementation
+-- left/right Child input plans
+-- on Join key pairs [(left_key, right_key)]
+-- filter Optional post-join filter
+-- join_type Inner/Left/Right/Full/Semi/Anti/Mark
+-- num_partitions Number of hash buckets (default 16)
+-- build_left Whether left input is the build side
+-- fast_path_threshold Per-task threshold for fast path (0 = disabled)
+-- schema Output schema
HashPartition Per-bucket state during partitioning
+-- build_batches In-memory build-side RecordBatches
+-- probe_batches In-memory probe-side RecordBatches
+-- build_spill_writer Optional SpillWriter for build data
+-- probe_spill_writer Optional SpillWriter for probe data
+-- build_mem_size Tracked memory for build side
+-- probe_mem_size Tracked memory for probe side
FinishedPartition State after spill writers are closed
+-- build_batches In-memory build batches (if not spilled)
+-- probe_batches In-memory probe batches (if not spilled)
+-- build_spill_file Temp file for spilled build data
+-- probe_spill_file Temp file for spilled probe data
execute()
|
+- Phase 1: Partition build side
| Hash-partition all build input into N buckets.
| Spill the largest bucket on memory pressure.
|
+- Phase 2: Partition probe side
| Hash-partition probe input into N buckets.
| Spill ALL non-spilled buckets on first memory pressure.
|
+- Decision: fast path or slow path?
| If no spilling occurred and total build size <= per-task threshold:
| -> Fast path: single HashJoinExec, stream probe directly
| Otherwise:
| -> Slow path: merge partitions, join sequentially
|
+- Phase 3 (slow path): Join each partition sequentially
Merge adjacent partitions to ~32 MB build-side groups.
For each group, create a per-partition HashJoinExec.
Spilled probes use streaming SpillReaderExec.
Oversized builds trigger recursive repartitioning.
After partitioning both sides, GHJ checks whether the build side is small enough to join in a single HashJoinExec:
- No partitions were spilled during Phases 1 or 2
- The fast path threshold is non-zero
- The actual build-side memory (measured via
get_array_memory_size()) is within the per-task threshold
When all conditions are met, GHJ concatenates all build-side batches, wraps the probe stream in a StreamSourceExec, and creates a single HashJoinExec with CollectLeft mode. The probe side streams directly through without buffering. This avoids the overhead of partition merging and sequential per-partition joins.
The fast path threshold is intentionally conservative because HashJoinExec creates non-spillable hash tables (can_spill: false). The per-task division ensures that concurrent tasks don't collectively exceed memory.
For each incoming batch from the build input:
- Evaluate join key expressions and compute hash values
- Assign each row to a partition:
partition_id = hash % num_partitions - Use the prefix-sum algorithm to efficiently extract contiguous row groups per partition via
arrow::compute::take() - For each partition's sub-batch:
- If the partition is already spilled, append to its
SpillWriter - Otherwise, call
reservation.try_grow(batch_size) - On failure: spill the largest non-spilled partition, retry
- If still fails: spill this partition and write to disk
- If the partition is already spilled, append to its
All in-memory build data is tracked in a shared MemoryReservation registered as can_spill: true, making GHJ a cooperative citizen in DataFusion's memory pool.
Same hash-partitioning algorithm as Phase 1, with key differences:
-
Spilled build implies spilled probe: If a partition's build side was spilled, the probe side is also spilled. Both sides must be on disk (or both in memory) for the join phase.
-
Aggressive spilling: On the first memory pressure event, all non-spilled partitions are spilled (both build and probe sides). This prevents thrashing between spilling and accumulating when multiple concurrent GHJ instances share a memory pool.
-
Shared reservation: The same
MemoryReservationfrom Phase 1 continues to track probe-side memory.
Before joining, adjacent FinishedPartitions are merged so each group has roughly TARGET_PARTITION_BUILD_SIZE (32 MB) of build data. This reduces the number of HashJoinExec invocations while keeping each hash table small.
Merged groups are joined sequentially — one at a time — so only one HashJoinInput consumer exists at any moment. The GHJ reservation is freed before Phase 3 begins; each per-partition HashJoinExec tracks its own memory.
In-memory partitions are joined via join_partition_recursive():
- Concatenate build and probe sub-batches
- Create
HashJoinExecwith both sides asMemorySourceConfig - If the build side is too large for a hash table: recursively repartition (up to
MAX_RECURSION_DEPTH = 3, yielding up to 16^3 = 4096 effective partitions)
Spilled partitions are joined via join_with_spilled_probe():
- Build side loaded from memory or disk via
spawn_blocking - Probe side streamed via
SpillReaderExec(never fully loaded into memory) - If the build side is too large: fall back to eager probe read + recursive repartitioning
SpillWriter wraps Arrow IPC StreamWriter for incremental appends:
- Uses
BufWriterwith 1 MB buffer (vs 8 KB default) for sequential throughput - Batches are appended one at a time — no need to rewrite the file
finish()flushes the writer and returns theRefCountedTempFile
Temp files are created via DataFusion's DiskManager, which handles allocation and cleanup.
Two read paths depending on context:
Eager read (read_spilled_batches): Opens file, reads all batches into Vec<RecordBatch>. Used for build-side spill files bounded by TARGET_PARTITION_BUILD_SIZE.
Streaming read (SpillReaderExec): An ExecutionPlan that reads batches on-demand:
- Spawns a
tokio::task::spawn_blockingto read from the file on a blocking thread pool - Uses an
mpscchannel (capacity 4) to feed batches to the async executor - Coalesces small sub-batches into ~8192-row chunks before sending, reducing per-batch overhead in the downstream hash join kernel
- The
RefCountedTempFilehandle is moved into the blocking closure to keep the file alive until reading completes
Hash-partitioning creates N sub-batches per input batch. With N=16 partitions and 1000-row input batches, spill files contain ~62-row sub-batches. SpillReaderExec coalesces these into ~8192-row batches on read, reducing channel send/recv overhead, hash join kernel invocations, and per-batch RecordBatch construction costs.
GHJ uses a single MemoryReservation registered as a spillable consumer (with_can_spill(true)). This reservation:
- Tracks all in-memory build and probe data across all partitions during Phases 1 and 2
- Grows via
try_grow()before each batch is added to memory - Shrinks via
shrink()when partitions are spilled to disk - Is freed before Phase 3, where each per-partition
HashJoinExectracks its own memory viaHashJoinInput
In a typical Spark executor, multiple tasks run concurrently, each potentially executing a GHJ. All instances share the same DataFusion memory pool. The "spill ALL non-spilled partitions" strategy in Phase 2 makes each instance's spill decision atomic — once triggered, the instance moves all its data to disk in one operation, preventing interleaving with other instances that would otherwise claim freed memory immediately.
DataFusion's memory pool (typically FairSpillPool) divides memory between spillable and non-spillable consumers. GHJ registers as spillable so the pool can account for its memory when computing fair shares. The per-partition HashJoinExec instances in Phase 3 use non-spillable HashJoinInput reservations, but since partitions are joined sequentially, only one hash table exists at a time, keeping peak memory at roughly build_size / num_partitions.
Instead of N separate take() kernel calls (one per partition), GHJ uses a prefix-sum algorithm:
- Hash: Compute hash values for all rows
- Assign: Map each row to a partition:
partition_id = hash % N - Count: Count rows per partition
- Prefix-sum: Accumulate counts into start offsets
- Scatter: Place row indices into contiguous regions per partition
- Take: Single
arrow::compute::take()per partition using the precomputed indices
This is O(rows) with good cache locality, compared to O(rows x partitions) for the naive approach.
GHJ hashes on the same join keys that Spark already used for its shuffle exchange, but with a different hash function (ahash via RandomState with fixed seeds). Spark's shuffle uses Murmur3, so all rows arriving at a given Spark partition share the same murmur3(key) % num_spark_partitions value but have diverse actual key values. GHJ's ahash produces a completely different distribution.
At each recursion level, a different random seed is used:
fn partition_random_state(recursion_level: usize) -> RandomState {
RandomState::with_seeds(
0x517cc1b727220a95 ^ (recursion_level as u64),
0x3a8b7c9d1e2f4056, 0, 0,
)
}This ensures rows that hash to the same partition at level 0 are distributed across different sub-partitions at level 1. The only case where repartitioning cannot help is true data skew — many rows with the same key value. No amount of rehashing can separate identical keys, which is why there is a MAX_RECURSION_DEPTH = 3 limit.
When a partition's build side is too large for a hash table (tested via try_grow(build_size * 3), where the 3x accounts for hash table overhead), GHJ recursively repartitions:
- Sub-partition both build and probe into 16 new buckets using a different hash seed
- Recursively join each sub-partition
- Maximum depth: 3 (yielding up to 16^3 = 4096 effective partitions)
- If still too large at max depth: return
ResourcesExhaustederror
After Phase 2, GHJ merges adjacent FinishedPartitions to reduce the number of per-partition HashJoinExec invocations. The target is TARGET_PARTITION_BUILD_SIZE (32 MB) per merged group. For example, with 16 partitions and 200 MB total build data, partitions are merged into ~6 groups of ~32 MB each instead of 16 groups of ~12 MB.
Merging only combines adjacent partitions (preserving hash locality) and never merges spilled with non-spilled partitions. The merge is a metadata-only operation — it combines batch lists and spill file handles without copying data.
GHJ respects Spark's build side selection (BuildLeft or BuildRight). The build_left flag determines:
- Which input is consumed in Phase 1 (build) vs Phase 2 (probe)
- How join key expressions are mapped
- How
HashJoinExecis constructed (build side is always left inCollectLeftmode)
When build_left = false, the HashJoinExec is created with swapped inputs and then swap_inputs() is called to produce correct output column ordering.
| Metric | Description |
|---|---|
build_time |
Time spent partitioning the build side |
probe_time |
Time spent partitioning the probe side |
spill_count |
Number of partition spill events |
spilled_bytes |
Total bytes written to spill files |
build_input_rows |
Total rows from build input |
build_input_batches |
Total batches from build input |
input_rows |
Total rows from probe input |
input_batches |
Total batches from probe input |
output_rows |
Total output rows |
elapsed_compute |
Total compute time |
- Adaptive partition count: Dynamically choose the number of partitions based on input size rather than a fixed default
- Spill file compression: Compress Arrow IPC data on disk to reduce I/O volume at the cost of CPU
- Upstream DataFusion spill support: Contribute spill capability to DataFusion's
HashJoinExecto eliminate the need for a separate GHJ operator