[SPARK-55890] Check arrow memory at end of tests#54689
[SPARK-55890] Check arrow memory at end of tests#54689garlandz-db wants to merge 55 commits intoapache:masterfrom
Conversation
|
@garlandz-db can you file a JIRA ticket? |
|
done |
|
|
||
| val attributes = attrs.map(exp => AttributeReference(exp.name, exp.dataType)()) | ||
| val buffer = ArrowConverters | ||
| val iter = ArrowConverters |
There was a problem hiding this comment.
without this fix
SparkConnectProtoSuite:
[info] org.apache.spark.sql.connect.planner.SparkConnectProtoSuite *** ABORTED *** (8 seconds, 710 milliseconds)
[info] 16896 did not equal 0 Arrow rootAllocator memory leak: 16896 bytes still allocated
(ArrowAllocatorLeakCheck.scala:33)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertio
| fileWriter.close() | ||
| fileWriterClosed = true | ||
| } | ||
| fileWriter.close() |
There was a problem hiding this comment.
this is not guaranteed if theres an exception midway
| private var fileWriterClosed = false | ||
|
|
||
| override def close(): Unit = { | ||
| fileWriter.close() |
There was a problem hiding this comment.
this can lead to two calls if it was closed in write midway but we call .close again . the fix is to add flag
| try { | ||
| writer.write(rdd.toLocalIterator) | ||
| } finally { | ||
| writer.close() |
There was a problem hiding this comment.
finished writing but missing a close
| val batches = reader.read().toArray // materialize before reader.close() in finally | ||
| ArrowConverters.toDataFrame(batches.iterator, schema, spark, "UTC", true, false) | ||
| } finally { | ||
| reader.close() |
There was a problem hiding this comment.
finished reading but missing a close
| ArrowConverters.toDataFrame(reader.read(), schema, spark, "UTC", true, false) | ||
| try { | ||
| val schema = ArrowUtils.fromArrowSchema(reader.schema) | ||
| ArrowConverters.toDataFrame(reader.read(), schema, spark, "UTC", true, false) |
There was a problem hiding this comment.
this materializes so its not lazy we can close after this line finishes
|
|
||
| @ExtendedSQLTest | ||
| class ColumnarBatchSuite extends SparkFunSuite { | ||
| class ColumnarBatchSuite extends SparkFunSuite with ArrowAllocatorLeakCheck { |
There was a problem hiding this comment.
Does this use ArrowVectors?
There was a problem hiding this comment.
theres a test that does
| if (code == ArrowLeakExitCode) { | ||
| // Arrow leak detected in server JVM. halt() is the only way to propagate | ||
| // failure from inside a JVM shutdown hook. | ||
| Runtime.getRuntime.halt(code) |
There was a problem hiding this comment.
Is it clear enough that the test failed due to a memory leak when this happens? Or should we add a log line.
Also using halt seem harsh. Is there a way to bubble this up nicer to the test? We should be able to hold in afterAll() and throw an exception if a leak occurred.
There was a problem hiding this comment.
you mean set a timer in the afterAll waiting for the service to close?
- SimpleSparkConnectService checks ArrowUtils.rootAllocator.getAllocatedMemory after shutdown and exits with code 77 (ArrowLeakExitCode) if non-zero - RemoteSparkSession.stop() propagates exit code 77 via Runtime.getRuntime.halt() (the only way to fail CI from inside a JVM shutdown hook) - ArrowLeakDetectionE2ETest verifies end-to-end detection using SPARK_TEST_ARROW_LEAK env var to inject a synthetic unreleased allocation in the server process Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- ArrowFileReadWrite.save/load: wrap writer/reader in try-finally so allocators are always released (fixes 3072-byte cross-suite leaks) - ArrowFileReadWrite.write: remove redundant fileWriter.close() call (close() already closes it; double-close caused no-op but was confusing) - ArrowAllocatorLeakCheck.afterAll: run super.afterAll() first so SparkSession/SparkConnect resources are released before leak assertion (fixes false-positive 131072-byte failures in SessionEventsManagerSuite and related suites) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Batches converter(rows.iterator) returns a CloseableIterator backed by ArrowBatchWithSchemaIterator, which holds a child allocator of ArrowUtils.rootAllocator. On the driver-side LocalTableScanExec path there is no TaskContext, so the task-completion listener that normally auto-closes the iterator never fires. Wrap in try-finally so the allocator is always released. This fixes the 131072-byte Arrow leak seen in SparkConnectServiceE2ESuite and PipelineEventStreamSuite when ArrowAllocatorLeakCheck is mixed in. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tProtoSuite Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…aladoc - Add ArrowFileReadWriteSuite with ArrowAllocatorLeakCheck mixin so the suite that directly exercises ArrowFileReadWrite.save/load also asserts no Arrow memory leaks after its own tests complete. - Expand ArrowAllocatorLeakCheck scaladoc with a mixin-order warning and correct/incorrect usage examples, since wrong ordering causes false-positive leak failures. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The LocalTableScanExec branch in processAsArrowBatches previously used converter(rows.iterator) which returns a mapped iterator (not AutoCloseable). When sendBatch threw (e.g., on client disconnect), the underlying ArrowBatchWithSchemaIterator was never closed, leaking 131072 bytes into ArrowUtils.rootAllocator and contaminating subsequent test suites. Fix: bypass the converter wrapper in the LocalTableScanExec branch and directly create an ArrowBatchWithSchemaIterator, closing it in a finally block. Arrow's close() is idempotent (isClosed guard), so double-close when the iterator exhausts itself is safe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
In SparkConnectServerTest.clearAllExecutions(), after calling close() on each ExecuteHolder, wait for the execution thread to terminate before proceeding. Root cause: the LocalTableScanExec branch in processAsArrowBatches calls eventsManager.postFinished() before serializing Arrow batches. Tests that wait for ExecuteStatus.Finished (e.g. SPARK-45133 local relation test) exit before the execution thread finishes processing. clearAllExecutions() interrupted the thread but did not wait for it to stop, leaving an open ArrowBatchWithSchemaIterator (131072 bytes) in ArrowUtils.rootAllocator. The ArrowAllocatorLeakCheck in afterAll() then detected the residual allocation. Fix: capture the holder list before close(), then eventuallyWithTimeout on isExecuteThreadRunnerAlive() so the thread's finally-block runs batches.close() before the leak check. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ch loop
When holder.close() interrupts the execution thread, the LocalTableScanExec
branch of processAsArrowBatches would continue processing all rows (1M+ in
the SPARK-45133 test) because the tight loop had no interrupt check. This
left the ArrowBatchWithSchemaIterator open for seconds after clearAllExecutions()
returned, causing ArrowAllocatorLeakCheck to see 131072 bytes still allocated.
Fix: check Thread.currentThread().isInterrupted() at each loop iteration and
throw InterruptedException immediately, ensuring the finally { batches.close() }
runs before the execution thread terminates.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… thread interrupt Move batch.close() into try-finally in ArrowBatchIterator.next() and ArrowBatchWithSchemaIterator.next() so retained Arrow buffers are always released even when MessageSerializer.serialize() or serializeBatch() throws ClosedByInterruptException (triggered when the NIO WritableByteChannelImpl channel is interrupted mid-write by clearAllExecutions()). Also make ArrowBatchIterator.close() idempotent and thread-safe using AtomicBoolean.compareAndSet, with try-finally to ensure allocator.close() runs even if root.close() throws. Co-authored-by: Isaac
reader.read() returns a lazy Iterator backed by the open ArrowFileReader. The try/finally was safe today only because ArrowConverters.toDataFrame happens to call .toArray internally, but that is an undocumented implementation detail. Eagerly calling .toArray at the call site makes the close-after-read contract explicit and independent of toDataFrame's internals. Co-authored-by: Isaac
No test fails without this change. toDataFrame already calls .toArray internally, so the lazy iterator is fully consumed before finally fires. The fix was defensive-only with no observable correctness benefit today. Co-authored-by: Isaac
reader.read() returns a lazy Iterator over the open file channel. Passing it directly to toDataFrame works today because toDataFrame calls .toArray internally, but that is an undocumented implementation detail. Eagerly materializing at the call site makes the ownership explicit: all bytes are read before reader.close() fires in finally, regardless of toDataFrame internals. Co-authored-by: Isaac
If loader.load(batch) or fileWriter.writeBatch() throws, batch was not closed, leaking off-heap memory from the child allocator. Wrap in try/finally to guarantee batch.close() runs even on exception. Co-authored-by: Isaac
Without this, a second call to close() would still invoke fileWriter.close() because the flag was only set in write()'s finally block, not in close() itself. Co-authored-by: Isaac
Co-authored-by: Isaac
When SparkConnectServerUtils.stop() is called directly (e.g., from @afterall in tests), bubble the Arrow leak detection as a RuntimeException so the test framework can report it properly. Keep halt() only for the shutdown hook path where exit() would deadlock. Co-authored-by: Isaac
- SparkArrowFileWriter: replace fileWriterClosed var with AtomicBoolean so concurrent close() calls are safe; use nested try/finally in close() so root/allocator are always released even if fileWriter.close() throws - ArrowAllocatorLeakCheck: wrap super.afterAll() in try/finally so the leak assertion runs even if a sibling mixin's teardown throws Co-authored-by: Isaac
When the server exits with ArrowLeakExitCode, throw RuntimeException unconditionally. This surfaces the leak as a test failure when stop() is called from afterAll(). Removes the stoppingFromShutdownHook flag and the halt() call. Co-authored-by: Isaac
The converter(rows.iterator) form returns a Scala-mapped iterator that is not AutoCloseable. When sendBatch throws (e.g., client disconnect), the underlying ArrowBatchWithSchemaIterator is never closed, leaking 131072 bytes into ArrowUtils.rootAllocator and failing ArrowAllocatorLeakCheck in SparkConnectServiceE2ESuite. Fix: use ArrowBatchWithSchemaIterator directly with try/finally to ensure close() is always called. This was previously reverted in the simplify commit; restoring it because SparkConnectServiceE2ESuite now catches it. Co-authored-by: Isaac
…Execution" This reverts commit 64d3c27.
rowToArrowConverter wraps ArrowBatchWithSchemaIterator in a Scala .map() which is not AutoCloseable. In the LocalTableScanExec branch, if sendBatch throws (e.g., client disconnect), the underlying iterator was never closed, leaking 131072 bytes into ArrowUtils.rootAllocator. Extract mkBatches to create the raw ArrowBatchWithSchemaIterator directly. LocalTableScanExec uses mkBatches with try/finally to guarantee close(). converter is rewritten to delegate to mkBatches, eliminating duplication of the toBatchWithSchemaIterator parameters. Co-authored-by: Isaac
Co-authored-by: Isaac
…ule import ArrowBatchWithSchemaIterator is private[sql] and not accessible across module boundaries when compiling connect/server against sql/core jar. Remove explicit import and type annotation; let the compiler infer the type from the return of toBatchWithSchemaIterator. The original rowToArrowConverter already accessed rowCountInLastBatch this way. Co-authored-by: Isaac
…x Arrow leak race condition In local mode, a Spark job submitted by an execute thread may still have tasks running (and holding ArrowBatchWithSchemaIterator allocations from ArrowUtils.rootAllocator) after the execute thread itself terminates. clearAllExecutions() was only waiting for the execute thread to exit, not for executor tasks to release Arrow memory, causing SparkConnectServiceE2ESuite and related suites to fail ArrowAllocatorLeakCheck with 131072 bytes still allocated. Fix: call spark.sparkContext.cancelAllJobs() after execute threads exit, then eventuallyWithTimeout-wait for rootAllocator memory to reach zero before proceeding. This ensures task completion listeners have run and Arrow memory is fully released before ArrowAllocatorLeakCheck fires. Co-authored-by: Isaac
The memory wait in clearAllExecutions() was too timing-sensitive: cancelAllJobs() only signals cancellation; CPU-bound Arrow tasks from range(1000000) can hold 131072 bytes for >30 seconds post-cancel, causing the afterEach check to time out (CI build 67840409551). By afterAll() time all earlier tests' tasks have completed; only the last test's tasks may still be running, and they finish quickly after cancellation. Moving the check there avoids the race while still catching genuine leaks before ArrowAllocatorLeakCheck fires. Co-authored-by: Isaac
…st-stop
The eventuallyWithTimeout { assert memory == 0 } in afterAll() ran before
SparkContext.stop(), so tasks could still hold Arrow allocations (Executor.stop()
only calls threadPool.shutdown() without awaitTermination). This caused permanent
131072-byte leak failures in SparkConnectServiceE2ESuite CI.
ArrowAllocatorLeakCheck.afterAll() already checks memory in a `finally` block
after `super.afterAll()` (which calls SparkContext.stop()), so the assertion
fires at the correct time. Remove the redundant pre-stop wait entirely.
Co-authored-by: Isaac
Scalastyle enforces ASCII-only characters; the em dash introduced in the previous commit caused a nonascii.message lint failure in CI. Co-authored-by: Isaac
ArrowUtils is no longer referenced after removing the pre-stop Arrow memory assertion. Co-authored-by: Isaac
… upstream master" This reverts commit b6f681f.
…tor to prevent memory leak ArrowBatchWithSchemaIterator allocates a child allocator from ArrowUtils.rootAllocator. In the RDD execution path, the converter lambda wraps it in a plain Scala map() iterator, so batches.close() was never called after the task completed or was cancelled, leaking 131072 bytes per partition into the root allocator. Register a TaskCompletionListener so close() fires on task success, failure, or cancellation. Option(TaskContext.get()) safely handles the driver-thread case (sendCollectedRows) where there is no task context. Co-authored-by: Garland Zhang <garland.zhang@databricks.com>
The previous fix added a redundant TaskCompletionListener (TCL) to the
`converter` function, but `ArrowConverters.toBatchWithSchemaIterator`
already passes `TaskContext.get()` to `ArrowBatchWithSchemaIterator`,
which registers its own TCL internally. The double registration caused
`close()` to be called twice, and since `allocator.close()` is not
idempotent in Arrow, the second call threw `IllegalStateException`,
leaving 131072 bytes still reported as leaked.
This commit corrects the fix with two changes:
1. Remove the redundant TCL from `converter` — the iterator already
closes itself via its own internally-registered TCL.
2. Fix `sendCollectedRows` (used by CollectLimitExec/CollectTailExec) to
call `mkBatches` directly and wrap iteration in `try/finally { close() }`.
This path runs on the driver thread where `TaskContext.get() == null`,
so no TCL fires — without `try/finally`, any exception mid-iteration
left the Arrow child allocator open and leaked 131072 bytes.
…ing 0 bytes Executor.stop() calls threadPool.shutdown() but does NOT call awaitTermination(), so Spark task threads (which hold Arrow child-allocator memory via ArrowBatchWithSchemaIterator) may still be running when SparkContext.stop() returns. In the E2E suite, the test "Execute is sent eagerly upon iterator creation" submits a Spark job for range(1000000). The client disconnects after receiving the schema response; the execute thread exits. clearAllExecutions() calls cancelAllJobs() (non-blocking), but the task processing 1M rows is still running. SparkContext.stop() shuts down the thread pool without waiting, so the task completion listener (which calls batches.close() and frees 131072 bytes) has not yet fired when ArrowAllocatorLeakCheck runs. Fix: in afterAll(), after cancelAllJobs(), use eventuallyWithTimeout to poll until ArrowUtils.rootAllocator.getAllocatedMemory == 0. This waits for in-flight task TCLs to fire and release all Arrow memory before the ArrowAllocatorLeakCheck assertion runs.
…rAll The eventuallyWithTimeout helper uses a 30-second ceiling, which is too short on slow CI runners (observed 3x slowdowns where Arrow serialization of range(1000000) takes >30 seconds). Use Eventually.eventually with an explicit 3-minute timeout so the wait succeeds even on sluggish machines. Co-authored-by: Isaac
…CI timeout Arrow serialization of 1M rows is CPU-bound and cannot be interrupted. On slow CI runners (3.7x slower than normal), processing 1M rows via ArrowBatchWithSchemaIterator takes >3 minutes, exhausting the afterAll() timeout in ArrowAllocatorLeakCheck before Arrow memory is freed. 100K rows exercises the same case _ => RDD code path in processAsArrowBatches and execute holders persist after completion until periodicMaintenance, so tests that check for holder visibility are unaffected. Co-authored-by: Isaac
…ory leak The 131072-byte Arrow leak persists even after reducing BIG_ENOUGH_QUERY because two other tests hold large datasets that prevent the execute thread from completing cleanup within the 30-second clearAllExecutions() window: 1. SparkConnectServiceE2ESuite: buildLocalRelation((1 to 1000000)) creates 1M rows processed via LocalTableScanExec on the execute thread. The test does not consume results, so the execute thread may block sending to gRPC before clearAllExecutions() can interrupt it. Reduce to 1000 rows — any non-empty local relation suffices to verify FINISHED state behavior. 2. SparkConnectServerTestSuite: range(1000000) with only iter.hasNext called (results not consumed). Reduce to range(100000) to match other tests. Co-authored-by: Isaac
Upstream apache/spark master updated dev/gen-protos.sh to use ruff format (replacing black) and updated the workflow to install ruff==0.14.8. Sync our fork's workflow to match, fixing the 'ruff: command not found' failure in the Protobuf/CodeGen check. Co-authored-by: Isaac
Fixes Arrow off-heap memory leaks in Spark Connect and adds afterAll guards to detect future leaks pre-merge.
Leaks fixed:
iterator.
drained but never closed.
Detection guards added — afterAll assertions on ArrowUtils.rootAllocator.getAllocatedMemory == 0 in:
future subclass)
Why are the changes needed?
ArrowUtils.rootAllocator is a JVM-wide singleton. Every toBatchWithSchemaIterator and fromIPCStream call allocates a child allocator and Arrow buffers from it. If the iterator is not explicitly closed, those buffers are never freed, causing off-heap memory growth on the driver. A related leak in the deserialization path (fromIPCStream) was fixed in SPARK-54696. This PR closes the complementary serialization-path gap that SPARK-54696 missed, and adds test-time assertions to catch regressions before merge.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
build/sbt "connect/Test/testOnly org.apache.spark.sql.connect.planner.SparkConnectPlannerSuite
org.apache.spark.sql.connect.planner.SparkConnectProtoSuite"
build/sbt "sql/Test/testOnly org.apache.spark.sql.execution.arrow.ArrowConvertersSuite"
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6