Skip to content

Commit bac383c

Browse files
committed
Fix Arrow iterator leak in LocalTableScanExec via mkBatches refactor
rowToArrowConverter wraps ArrowBatchWithSchemaIterator in a Scala .map() which is not AutoCloseable. In the LocalTableScanExec branch, if sendBatch throws (e.g., client disconnect), the underlying iterator was never closed, leaking 131072 bytes into ArrowUtils.rootAllocator. Extract mkBatches to create the raw ArrowBatchWithSchemaIterator directly. LocalTableScanExec uses mkBatches with try/finally to guarantee close(). converter is rewritten to delegate to mkBatches, eliminating duplication of the toBatchWithSchemaIterator parameters. Co-authored-by: Isaac
1 parent d57c504 commit bac383c

1 file changed

Lines changed: 30 additions & 11 deletions

File tree

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.connect.planner.{InvalidInputErrors, SparkConnectPla
3838
import org.apache.spark.sql.connect.service.ExecuteHolder
3939
import org.apache.spark.sql.connect.utils.{ErrorUtils, MetricGenerator, PipelineAnalysisContextUtils}
4040
import org.apache.spark.sql.execution.{DoNotCleanup, LocalTableScanExec, QueryExecution, RemoveShuffleFiles, SkipMigration, SQLExecution}
41-
import org.apache.spark.sql.execution.arrow.ArrowConverters
41+
import org.apache.spark.sql.execution.arrow.{ArrowBatchWithSchemaIterator, ArrowConverters}
4242
import org.apache.spark.sql.internal.SQLConf
4343
import org.apache.spark.sql.types.{DataType, StructType}
4444
import org.apache.spark.util.ThreadUtils
@@ -142,13 +142,25 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
142142
// Whether to enable arrow batch chunking for large result batches.
143143
val isResultChunkingEnabled = executePlan.resultChunkingEnabled
144144

145-
val converter = rowToArrowConverter(
146-
schema,
147-
maxRecordsPerBatch,
148-
maxBatchSize,
149-
timeZoneId,
150-
errorOnDuplicatedFieldNames = false,
151-
largeVarTypes = largeVarTypes)
145+
// mkBatches creates an ArrowBatchWithSchemaIterator (AutoCloseable). It is used directly
146+
// in the LocalTableScanExec branch so that we can close it in a finally block — the
147+
// converter wrapper below returns a plain Scala-mapped iterator that is NOT AutoCloseable,
148+
// so if sendBatch throws (e.g., client disconnect) the underlying iterator would leak
149+
// 131072 bytes into ArrowUtils.rootAllocator.
150+
val mkBatches: Iterator[InternalRow] => ArrowBatchWithSchemaIterator = rows =>
151+
ArrowConverters.toBatchWithSchemaIterator(
152+
rows,
153+
schema,
154+
maxRecordsPerBatch,
155+
maxBatchSize,
156+
timeZoneId,
157+
errorOnDuplicatedFieldNames = false,
158+
largeVarTypes = largeVarTypes)
159+
160+
val converter: Iterator[InternalRow] => Iterator[Batch] = rows => {
161+
val batches = mkBatches(rows)
162+
batches.map(b => b -> batches.rowCountInLastBatch)
163+
}
152164

153165
var numSent = 0
154166
def sendBatch(bytes: Array[Byte], count: Long, startOffset: Long): Unit = {
@@ -209,9 +221,16 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
209221
case LocalTableScanExec(_, rows, _) =>
210222
executePlan.eventsManager.postFinished(Some(rows.length))
211223
var offset = 0L
212-
converter(rows.iterator).foreach { case (bytes, count) =>
213-
sendBatch(bytes, count, offset)
214-
offset += count
224+
val batches = mkBatches(rows.iterator)
225+
try {
226+
while (batches.hasNext) {
227+
val batchBytes = batches.next()
228+
val count = batches.rowCountInLastBatch
229+
sendBatch(batchBytes, count, offset)
230+
offset += count
231+
}
232+
} finally {
233+
batches.close()
215234
}
216235
case _ =>
217236
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {

0 commit comments

Comments
 (0)