[VL] Fix false-EOF probe in ColumnarCachedBatchSerializer.read trailing markers#12147
Merged
yaooqinn merged 1 commit intoMay 28, 2026
Merged
Conversation
…ColumnarBatch
ColumnarCachedBatchSerializer.read guarded the trailing hasStats /
hasSchema booleans with `input.available() > 0` to tolerate the V1
wire format that predates those markers. The intent was correct --
the existing ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks
absent-trailing as silent null, and that contract must be preserved
-- but `Input.available()` is the wrong probe.
`Kryo Input.available()` returns
`(limit - position) + underlyingStream.available()`, and the JDK
`InputStream.available()` contract permits any implementation to
return 0 even when more data follows -- BufferedInputStream over
shuffle-spill / network chunk boundaries routinely does so. When the
Kryo buffer is drained AND the underlying stream reports 0 at the
trailing-boolean byte position, the probe falsely concludes EOF,
skips hasStats, and the next `readClassAndObject` interprets the
stats payload (which contains the schema JSON) as a class name --
surfacing as `ClassNotFoundException: {"type":"struct",...}` with
the stack topped by `DefaultClassResolver.readName`.
Replace the probe with a try/readBoolean/catch on the narrow Kryo
"Buffer underflow" surface. This catches the real EOF when the V1
wire has no trailing booleans (preserves the silent-null contract)
without ever consulting `available()`, so a V2 wire under
chunked-fill always reads the trailing markers correctly.
The catch is intentionally narrow (message-prefix match on
"Buffer underflow") so that genuine corruption -- including
ClassNotFoundException wrapped during readClassAndObject -- is
never swallowed.
The length-bound `require(... <= maxLen ...)` guard from commit
491070b (defending against NegativeArraySizeException /
oversized allocation) is preserved -- that part is orthogonal to
the V1 probe and remains useful.
A new test ColumnarCachedBatchKryoBoundaryProbeBugSuite locks the
chunked-fill probe contract: a 1-byte-per-read InputStream that
returns `available() == 0` must still round-trip multi-batch V2
wire correctly. The existing V1-wire silent-null test in
ColumnarCachedBatchKryoSuite continues to pass unchanged.
8c36260 to
da46055
Compare
liuneng1994
reviewed
May 28, 2026
Contributor
There was a problem hiding this comment.
PR description contradicts the code
Cross-referencing the existing locked test in ColumnarCachedBatchKryoSuite.scala:
test("V1 wire (no trailing hasStats/hasSchema booleans) reads as stats=null/schema=null") {
...
assert(read.stats === null, "absent trailing hasStats must read as null, not throw")
assert(read.schema === null, "absent trailing hasSchema must read as null, not throw")
}This contract requires V1 compat to be preserved — and the new try/catch (KryoException if isBufferUnderflow) => false is exactly what satisfies it. But the PR description says the opposite:
| description claims | code / tests actually |
|---|---|
| "V1 wire era … never released … guard protects nothing" | try/catch preserves V1 compat; locked by existing "V1 wire …" test |
| "A truncated stream now surfaces KryoException rather than silently returning null" | truncated stream is caught and returns null stats — same silent behavior |
Cites BoundaryProbeBugSuite#"truncated wire … fails fast, not silently" |
new file only contains 1 test ("multi-batch deserialize survives …") — the cited test doesn't exist |
Suggested fix: keep the code as-is (preserving V1 compat is correct), and trim the description down to honestly what the patch does — "fix the false-EOF probe in Input.available() so the trailing booleans are read across chunk boundaries; V1 compat is now achieved via try/catch on KryoException instead". Drop the "Spark cached blocks live and die within a single application instance" paragraph and the cite to a non-existent test.
liuneng1994
approved these changes
May 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
ColumnarCachedBatchSerializer.readusedinput.available() > 0asan EOF probe before reading the trailing
hasStats/hasSchemaboolean markers. This PR replaces the probe with a direct
readBoolean()inside a narrow
try/catchonKryoExceptionwhose message startswith
"Buffer underflow", which is the actual signal Kryo emits attrue end-of-input.
The V1 wire back-compat behavior (absent trailing booleans read as
null, not throw) is preserved — and is locked by the existing testColumnarCachedBatchKryoSuite#"V1 wire (no trailing hasStats/hasSchema booleans) reads as stats=null/schema=null".Why are the changes needed?
Kryo Input.available()returns(limit - position) + underlyingStream.available(). The JDKInputStream.available()contract permits any implementation toreturn
0even when more data follows —BufferedInputStreamovershuffle-spill / network chunk boundaries routinely does so. When the
Kryo buffer is drained AND the underlying stream reports
0at thetrailing-boolean byte position, the probe falsely concludes EOF,
skips
hasStats, and the nextreadClassAndObjectinterprets thestats payload (which contains the schema JSON) as a class name.
Production stack we observed (truncated):
The length-bound
require(... <= maxLen ...)guard from491070bf34is preserved — it remains useful againstNegativeArraySizeException/ oversized allocation and isorthogonal to the EOF-probe fix.
Why catch
KryoExceptionby message prefix?Kryo 4.x (the version on Gluten's classpath via
kryo-shaded) hasno dedicated
KryoBufferUnderflowExceptionsubclass — that was addedin Kryo 5.x. The message-prefix check is the narrowest filter we can
apply on 4.x without swallowing real corruption (e.g. a
ClassNotFoundExceptionwrapped duringreadClassAndObject, whichdoes not start with
"Buffer underflow").Does this PR introduce any user-facing change?
No (bug fix in a build that has not shipped to any tagged release).
How was this patch tested?
New
ColumnarCachedBatchKryoBoundaryProbeBugSuite:"multi-batch deserialize survives boundary-aligned trailing-boolean probe"— deterministic repro of the production stack using a
1-byte-per-read
InputStreamthat returnsavailable() == 0atthe trailing-boolean position. Fails on the pre-patch code, passes
after the fix.
Existing
ColumnarCachedBatchKryoSuite#"V1 wire ..."continues topass, locking the V1 silent-null back-compat contract.
Verified locally on
-Pspark-4.1 -Pscala-2.13; affected suites(
Framed,BuildFilter,StatsBlob,ShipBlockerMarshal,IntFamilyMarshal) all green (23/23). CI 60/60 green onda460551e7.Was this patch authored or co-authored using generative AI tooling?
No