Skip to content

perf: Replace Arrow IPC with more efficient shuffle format [experimental]#3733

Draft
andygrove wants to merge 29 commits intoapache:mainfrom
andygrove:shuffle-format
Draft

perf: Replace Arrow IPC with more efficient shuffle format [experimental]#3733
andygrove wants to merge 29 commits intoapache:mainfrom
andygrove:shuffle-format

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Mar 19, 2026

Which issue does this PR close?

Performance improvements for native shuffle.

Rationale for this change

This PR contains two related performance improvements for native shuffle:

1. Shuffle Direct Read Simplification

Simplifies the shuffle direct read code path, removing unnecessary complexity (separate PR #3731 exists for this change).

2. Replace Arrow IPC with Raw Buffer Format

The shuffle write/read path previously used Arrow IPC to serialize RecordBatches. Arrow IPC embeds a full schema flatbuffer in every block (~200-800 bytes), plus message headers and alignment padding. Since the reader already knows the schema from protobuf, this overhead is unnecessary.

Benchmarking against Gluten (which uses raw columnar buffers for shuffle) showed this as a key performance differentiator. The new raw buffer format writes Arrow ArrayData buffers directly with minimal headers (~16 bytes per column vs ~200+ bytes for IPC).

What changes are included in this PR?

Shuffle direct read simplification:

  • Simplified shuffle direct read code path
  • Removed unused imports and doc files

New shuffle serialization format that replaces Arrow IPC with raw Arrow buffer writes:

  • write_array_data / read_array_data: Recursively serialize/deserialize Arrow ArrayData (validity bitmap + value buffers + child data for nested types)
  • write_raw_batch / read_shuffle_block: Top-level batch serialization with schema parameter
  • Dictionary arrays are cast to their value type before writing (Arrow IPC handled this transparently; the raw format requires explicit handling)
  • The outer block header (16-byte compressed_length + field_count) is unchanged for JVM compatibility
  • JNI decodeShuffleBlock now accepts protobuf schema bytes so the native side can reconstruct the Arrow Schema for deserialization
  • CometBlockStoreShuffleReader serializes the output schema as a protobuf ShuffleScan message and passes it to the native decoder

Format spec:

Outer header (unchanged):
  [8 bytes: compressed_length] [8 bytes: field_count]
  [4 bytes: codec tag] [compressed body...]

Inner body (new):
  [4 bytes: num_rows]
  For each column: array_data(...)

array_data (recursive):
  [4 bytes: null_count] [4 bytes: validity_len] [validity bytes]
  [4 bytes: num_buffers] (for each: [4 bytes: len] [bytes])
  [4 bytes: num_children] (for each: [4 bytes: child_rows] array_data(...))

How are these changes tested?

  • 10 new Rust roundtrip tests covering all data types: primitives, strings, binary, boolean, null, decimal, date, timestamp, nested types (List, Struct, Map), dictionary cast to plain arrays, and empty batches. All 4 compression codecs tested (None, LZ4, Zstd, Snappy).
  • All existing shuffle writer end-to-end tests updated and passing (156 Rust tests total)
  • Full CometNativeShuffleSuite JVM integration tests pass (20 tests), exercising both the traditional JNI/FFI read path and the direct read path

Adds a design document for bypassing Arrow FFI in the shuffle read
path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks
from JVM via CometShuffleBlockIterator and decodes them natively using
read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called
externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing:
- ClassCastException: factory closure incorrectly cast Partition to
  CometExecPartition before extracting ShuffledRowRDDPartition; the
  partition passed to the factory is already the unwrapped partition
  from the input RDD
- NoSuchElementException in SQLShuffleReadMetricsReporter: metrics
  field in CometShuffledBatchRDD was not exposed as a val, causing
  Map.empty to be used instead of the real shuffle metrics map

Add Scala integration test that runs a repartition+aggregate query
with direct read enabled and disabled to verify result parity.
Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value)
- Make readAsRawStream() lazy instead of materializing all streams to a List
- Remove pointless DirectByteBuffer re-allocation in close()
- Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls
foreign zstd functions that Miri cannot execute.
Replace Arrow IPC StreamWriter/StreamReader with a lightweight raw buffer
format that writes Arrow ArrayData buffers directly. The new format has
minimal per-block overhead (~16 bytes per column vs ~200-800 bytes for IPC
schema flatbuffers). The outer block header (compressed_length + field_count)
is unchanged for JVM compatibility.

Key changes:
- write_array_data: recursively serializes ArrayData (validity + buffers + children)
- read_array_data: reconstructs ArrayData from raw buffers using known schema
- Dictionary arrays are cast to value type before writing
- read_shuffle_block replaces read_ipc_compressed (takes schema parameter)
- read_ipc_compressed retained temporarily for callers not yet migrated
@andygrove andygrove changed the title Shuffle format perf: Replace Arrow IPC with more efficient shuffle format Mar 19, 2026
Use RecordBatch::try_new_with_options with explicit row_count instead
of try_new so that zero-column batches (produced by Spark when query
results are unused) do not fail with "must either specify a row count
or at least one column".
CometColumnarShuffle was not setting outputAttributes on the
CometShuffleDependency, leaving it as Seq.empty. This caused
the shuffle reader to pass an empty schema to the native
decodeShuffleBlock, resulting in "Output column count mismatch:
expected N, got 0" errors.
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format perf: Replace Arrow IPC with more efficient shuffle format [WIP] Mar 19, 2026
The null bitmap in Arrow arrays can have a non-zero bit offset even
when ArrayData.offset() is 0 (e.g. after RecordBatch::slice). The
raw shuffle writer was copying the bitmap bytes verbatim, but the
reader assumes bits start at offset 0. This caused shifted null
bitmaps, corrupting data during shuffle and producing wrong query
results (e.g. TPC-DS q6 counts off by 1).

Fix by detecting non-zero bitmap offsets and emitting a re-aligned
copy. Add a roundtrip test with sliced batches to cover this case.
Arrays from RecordBatch::slice can have non-zero offsets in both
the ArrayData and the null bitmap. The raw shuffle format writes
buffers verbatim assuming offset 0, causing data corruption when
offsets are present.

Use take() to produce zero-offset copies when needed, similar to
prepare_output in jni_api.rs. This fixes TPC-DS q64 failures
where the debug_assert fired and data mismatch errors from
shifted null bitmaps.
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format [WIP] perf: Replace Arrow IPC with more efficient shuffle format [experimental] Mar 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant