feat: stop using FFI in native shuffle read path#3731
Open
andygrove wants to merge 16 commits intoapache:mainfrom
Open
feat: stop using FFI in native shuffle read path#3731andygrove wants to merge 16 commits intoapache:mainfrom
andygrove wants to merge 16 commits intoapache:mainfrom
Conversation
Adds a design document for bypassing Arrow FFI in the shuffle read path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks from JVM via CometShuffleBlockIterator and decodes them natively using read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing: - ClassCastException: factory closure incorrectly cast Partition to CometExecPartition before extracting ShuffledRowRDDPartition; the partition passed to the factory is already the unwrapped partition from the input RDD - NoSuchElementException in SQLShuffleReadMetricsReporter: metrics field in CometShuffledBatchRDD was not exposed as a val, causing Map.empty to be used instead of the real shuffle metrics map Add Scala integration test that runs a repartition+aggregate query with direct read enabled and disabled to verify result parity. Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value) - Make readAsRawStream() lazy instead of materializing all streams to a List - Remove pointless DirectByteBuffer re-allocation in close() - Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls foreign zstd functions that Miri cannot execute.
a7e9659 to
19cb04b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.
Rationale for this change
Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.
What changes are included in this PR?
How are these changes tested?