Skip to content

feat: stop using FFI in native shuffle read path#3731

Open
andygrove wants to merge 16 commits intoapache:mainfrom
andygrove:shuffle-direct-read
Open

feat: stop using FFI in native shuffle read path#3731
andygrove wants to merge 16 commits intoapache:mainfrom
andygrove:shuffle-direct-read

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Mar 18, 2026

Which issue does this PR close?

Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.

Rationale for this change

Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.

What changes are included in this PR?

How are these changes tested?

Adds a design document for bypassing Arrow FFI in the shuffle read
path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks
from JVM via CometShuffleBlockIterator and decodes them natively using
read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called
externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing:
- ClassCastException: factory closure incorrectly cast Partition to
  CometExecPartition before extracting ShuffledRowRDDPartition; the
  partition passed to the factory is already the unwrapped partition
  from the input RDD
- NoSuchElementException in SQLShuffleReadMetricsReporter: metrics
  field in CometShuffledBatchRDD was not exposed as a val, causing
  Map.empty to be used instead of the real shuffle metrics map

Add Scala integration test that runs a repartition+aggregate query
with direct read enabled and disabled to verify result parity.
Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value)
- Make readAsRawStream() lazy instead of materializing all streams to a List
- Remove pointless DirectByteBuffer re-allocation in close()
- Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls
foreign zstd functions that Miri cannot execute.
@andygrove andygrove marked this pull request as ready for review March 19, 2026 13:59
@andygrove andygrove changed the title feat: bypass Arrow FFI for native shuffle read path feat: replace Arrow IPC with raw buffer format in shuffle Mar 19, 2026
@andygrove andygrove changed the title feat: replace Arrow IPC with raw buffer format in shuffle feat: shuffle direct read and raw buffer shuffle format Mar 19, 2026
@andygrove andygrove marked this pull request as draft March 19, 2026 16:33
@andygrove andygrove force-pushed the shuffle-direct-read branch from a7e9659 to 19cb04b Compare March 19, 2026 16:36
@andygrove andygrove changed the title feat: shuffle direct read and raw buffer shuffle format feat: stop using FFI in native shuffle read path Mar 19, 2026
@andygrove andygrove marked this pull request as ready for review March 19, 2026 16:38
@andygrove andygrove requested review from wForget March 19, 2026 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant