diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index 7bf7a175463..bb548b2f9ce 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} +import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoSerializer} import com.esotericsoftware.kryo.DefaultSerializer import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.arrow.c.ArrowSchema @@ -152,14 +152,30 @@ class CachedColumnarBatchKryoSerializer extends KryoSerializer[CachedColumnarBat ) val bytes = new Array[Byte](payloadLen) input.readBytes(bytes) - // Backward-compat with the V1 wire format (no trailing hasStats / hasSchema booleans): - // legacy CachedColumnarBatch instances persisted on disk (DISK_ONLY / MEMORY_AND_DISK) - // surviving a rolling upgrade lack these fields. available() is best-effort -- treats - // unavailable suffix as "absent" instead of throwing KryoException. - val hasStats = input.available() > 0 && input.readBoolean() - // Even when hasStats=false we still consume the hasSchema tag to keep the stream aligned. - // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics and the typed - // pattern match throws MatchError at runtime. + // Read the trailing hasStats marker. Catching a Buffer-underflow KryoException + // here preserves backward compatibility with the V1 wire format (no trailing + // hasStats / hasSchema booleans), which the existing + // ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks as a contract: + // an absent trailing byte must read as null, not throw. + // + // Why a try/catch instead of `input.available() > 0 && readBoolean`: + // Kryo `Input.available()` returns `(limit - position) + underlyingStream.available()`, + // and the JDK `InputStream.available()` contract permits any implementation to + // return 0 even when more data follows -- BufferedInputStream over shuffle-spill + // / network chunk boundaries routinely does so. When the Kryo buffer is drained + // AND the underlying stream reports 0 at the trailing-boolean byte position, the + // probe falsely concludes EOF, skips hasStats, and the next readClassAndObject + // interprets the stats payload (which contains the schema JSON) as a class name -- + // surfacing as `ClassNotFoundException: {"type":"struct",...}` with the stack + // topped by `DefaultClassResolver.readName`. A try/catch on the real EOF surface + // (Kryo "Buffer underflow") avoids the false-EOF probe while still tolerating + // V1 wire. + // + // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics and the + // typed pattern match throws MatchError at runtime. + val hasStats = + try input.readBoolean() + catch { case e: KryoException if isBufferUnderflow(e) => false } val statsAndSchema: (InternalRow, StructType) = if (hasStats) { val statsLen = input.readInt() require( @@ -177,9 +193,21 @@ class CachedColumnarBatchKryoSerializer extends KryoSerializer[CachedColumnarBat CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1, statsAndSchema._2) } + // Kryo signals end-of-input by throwing KryoException with a message starting + // with "Buffer underflow". There is no dedicated subclass, so a message-prefix + // check is the narrowest filter we can apply without swallowing real corruption + // (e.g. ClassNotFoundException wrapped during readClassAndObject). + private def isBufferUnderflow(e: KryoException): Boolean = { + val msg = e.getMessage + msg != null && msg.startsWith("Buffer underflow") + } + private def readOptionalSchema(input: Input, maxLen: Long): StructType = { - // Treat absent trailing bytes as "no schema": V1 wire format predates this field. - if (input.available() <= 0 || !input.readBoolean()) { + // Trailing schema marker. See readSchema above for the same V1-vs-chunked-fill rationale. + val hasSchema = + try input.readBoolean() + catch { case e: KryoException if isBufferUnderflow(e) => false } + if (!hasSchema) { null } else { val schemaLen = input.readInt() diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala new file mode 100644 index 00000000000..15bbcb3f847 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} +import org.scalatest.funsuite.AnyFunSuite + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} + +/** + * Deterministic repro for the L154/L180 Input.available() boolean-probe bug. + * + * Trigger conditions (all required): + * (1) Multi-batch deserialize via kryo.readClassAndObject from one stream. + * (2) Kryo Input wraps an InputStream (not byte[]). + * (3) At a batch's trailing hasStats/hasSchema position, the underlying + * InputStream returns available()=0 AND the Kryo Input buffer is drained + * (limit==position). Both conditions must hit the SAME byte position. + * + * Real prod path observed in production: + * BufferedInputStream over shuffle-spill / network ManagedBuffer chunk + * boundary -> stream.available()=0 between chunks, Kryo Input.available() + * = (limit-pos) + 0 -> reads 0 when buffer drained. + * + * Fixture: 1-byte-per-read stream + lying available()=0 -> every byte boundary + * satisfies (3); any trailing-boolean byte aligned with a Kryo refill triggers + * the false-EOF. + */ +class ColumnarCachedBatchKryoBoundaryProbeBugSuite extends AnyFunSuite { + + final private class LyingOneByteStream(src: InputStream) extends InputStream { + override def read(): Int = src.read() + override def read(b: Array[Byte], off: Int, len: Int): Int = { + if (len == 0) 0 + else { + val c = src.read() + if (c == -1) -1 + else { + b(off) = c.toByte + 1 + } + } + } + override def available(): Int = 0 + } + + private def mkBatch(i: Int): CachedColumnarBatch = { + // PartitionStatistics per-column slots: + // [lower(typed) upper(typed) count(Int) nullCount(Int) sizeBytes(Long)] + val stats: InternalRow = + new GenericInternalRow(Array[Any](i.toLong, (i * 10).toLong, i, 0, 8L)) + val schema = StructType(Seq(StructField(s"col$i", LongType, nullable = true))) + val bytes = Array.fill[Byte](128)(i.toByte) + CachedColumnarBatch( + numRows = i, + sizeInBytes = bytes.length.toLong, + bytes = bytes, + stats = stats, + schema = schema) + } + + test("multi-batch deserialize survives boundary-aligned trailing-boolean probe") { + val kryo = new Kryo() + val ser = new CachedColumnarBatchKryoSerializer() + kryo.register(classOf[CachedColumnarBatch], ser) + + val baos = new ByteArrayOutputStream() + val out = new Output(baos) + val originals = (1 to 10).map(mkBatch) + originals.foreach(b => kryo.writeClassAndObject(out, b)) + out.close() + + val raw = baos.toByteArray + val in = new Input(new LyingOneByteStream(new ByteArrayInputStream(raw)), 32) + + val read = (1 to 10).map(_ => kryo.readClassAndObject(in).asInstanceOf[CachedColumnarBatch]) + in.close() + + originals.zip(read).zipWithIndex.foreach { + case ((o, r), i) => + info(s"batch $i: orig.stats=${o.stats != null} schema=${o.schema}") + info(s"batch $i: read.stats=${r.stats != null} schema=${r.schema}") + assert(r.numRows == o.numRows, s"batch $i numRows mismatch") + assert(r.bytes.toSeq == o.bytes.toSeq, s"batch $i bytes mismatch") + assert(r.stats != null, s"batch $i stats lost (BUG)") + assert(r.schema == o.schema, s"batch $i schema mismatch (BUG)") + } + } + + // V1 wire backward-compat is locked by ColumnarCachedBatchKryoSuite#"V1 wire ..." + // -- not duplicated here. This suite only covers the chunked-fill probe path. +}