perf: Replace Arrow IPC with more efficient shuffle format [experimental]#3733
Draft
andygrove wants to merge 29 commits intoapache:mainfrom
Draft
perf: Replace Arrow IPC with more efficient shuffle format [experimental]#3733andygrove wants to merge 29 commits intoapache:mainfrom
andygrove wants to merge 29 commits intoapache:mainfrom
Conversation
Adds a design document for bypassing Arrow FFI in the shuffle read path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks from JVM via CometShuffleBlockIterator and decodes them natively using read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing: - ClassCastException: factory closure incorrectly cast Partition to CometExecPartition before extracting ShuffledRowRDDPartition; the partition passed to the factory is already the unwrapped partition from the input RDD - NoSuchElementException in SQLShuffleReadMetricsReporter: metrics field in CometShuffledBatchRDD was not exposed as a val, causing Map.empty to be used instead of the real shuffle metrics map Add Scala integration test that runs a repartition+aggregate query with direct read enabled and disabled to verify result parity. Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value) - Make readAsRawStream() lazy instead of materializing all streams to a List - Remove pointless DirectByteBuffer re-allocation in close() - Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls foreign zstd functions that Miri cannot execute.
Replace Arrow IPC StreamWriter/StreamReader with a lightweight raw buffer format that writes Arrow ArrayData buffers directly. The new format has minimal per-block overhead (~16 bytes per column vs ~200-800 bytes for IPC schema flatbuffers). The outer block header (compressed_length + field_count) is unchanged for JVM compatibility. Key changes: - write_array_data: recursively serializes ArrayData (validity + buffers + children) - read_array_data: reconstructs ArrayData from raw buffers using known schema - Dictionary arrays are cast to value type before writing - read_shuffle_block replaces read_ipc_compressed (takes schema parameter) - read_ipc_compressed retained temporarily for callers not yet migrated
Use RecordBatch::try_new_with_options with explicit row_count instead of try_new so that zero-column batches (produced by Spark when query results are unused) do not fail with "must either specify a row count or at least one column".
CometColumnarShuffle was not setting outputAttributes on the CometShuffleDependency, leaving it as Seq.empty. This caused the shuffle reader to pass an empty schema to the native decodeShuffleBlock, resulting in "Output column count mismatch: expected N, got 0" errors.
The null bitmap in Arrow arrays can have a non-zero bit offset even when ArrayData.offset() is 0 (e.g. after RecordBatch::slice). The raw shuffle writer was copying the bitmap bytes verbatim, but the reader assumes bits start at offset 0. This caused shifted null bitmaps, corrupting data during shuffle and producing wrong query results (e.g. TPC-DS q6 counts off by 1). Fix by detecting non-zero bitmap offsets and emitting a re-aligned copy. Add a roundtrip test with sliced batches to cover this case.
Arrays from RecordBatch::slice can have non-zero offsets in both the ArrayData and the null bitmap. The raw shuffle format writes buffers verbatim assuming offset 0, causing data corruption when offsets are present. Use take() to produce zero-offset copies when needed, similar to prepare_output in jni_api.rs. This fixes TPC-DS q64 failures where the debug_assert fired and data mismatch errors from shifted null bitmaps.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Performance improvements for native shuffle.
Rationale for this change
This PR contains two related performance improvements for native shuffle:
1. Shuffle Direct Read Simplification
Simplifies the shuffle direct read code path, removing unnecessary complexity (separate PR #3731 exists for this change).
2. Replace Arrow IPC with Raw Buffer Format
The shuffle write/read path previously used Arrow IPC to serialize RecordBatches. Arrow IPC embeds a full schema flatbuffer in every block (~200-800 bytes), plus message headers and alignment padding. Since the reader already knows the schema from protobuf, this overhead is unnecessary.
Benchmarking against Gluten (which uses raw columnar buffers for shuffle) showed this as a key performance differentiator. The new raw buffer format writes Arrow
ArrayDatabuffers directly with minimal headers (~16 bytes per column vs ~200+ bytes for IPC).What changes are included in this PR?
Shuffle direct read simplification:
New shuffle serialization format that replaces Arrow IPC with raw Arrow buffer writes:
write_array_data/read_array_data: Recursively serialize/deserialize ArrowArrayData(validity bitmap + value buffers + child data for nested types)write_raw_batch/read_shuffle_block: Top-level batch serialization with schema parameterdecodeShuffleBlocknow accepts protobuf schema bytes so the native side can reconstruct the Arrow Schema for deserializationCometBlockStoreShuffleReaderserializes the output schema as a protobufShuffleScanmessage and passes it to the native decoderFormat spec:
How are these changes tested?
CometNativeShuffleSuiteJVM integration tests pass (20 tests), exercising both the traditional JNI/FFI read path and the direct read path