-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-55890] Check arrow memory at end of tests #54689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
garlandz-db
wants to merge
53
commits into
apache:master
Choose a base branch
from
garlandz-db:SASP-7529
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
6d7d90f
Check arrow memory at end of tests
garlandz-db aceb571
add
garlandz-db e72b42e
full checks
garlandz-db 30b6eaf
update
garlandz-db 3c43723
Add Arrow rootAllocator leak check in server JVM for E2E tests
garlandz-db bc4998b
Fix Arrow memory leaks and leak check ordering
garlandz-db 00a45d9
Restore fileWriter.close() inside SparkArrowFileWriter.write()
garlandz-db 5f5a8f3
Fix Arrow allocator leak in LocalTableScanExec path of processAsArrow…
garlandz-db ed06ce9
Apply scalafmt formatting to SparkConnectPlannerSuite and SparkConnec…
garlandz-db a131dab
Add ArrowAllocatorLeakCheck to ArrowFileReadWriteSuite and improve sc…
garlandz-db 7f70170
Fix Arrow allocator leak in LocalTableScanExec when sendBatch throws
garlandz-db 7d25547
[SPARK-55890] Wait for execution threads in clearAllExecutions
garlandz-db 5704e57
[SPARK-55890] Re-trigger CI (infra flake: actions/setup-java 401)
garlandz-db 29bdb39
[SPARK-55890] Re-trigger CI (second attempt, infra flake resolved)
garlandz-db 2a0a62e
[SPARK-55890] Honour thread interrupt in LocalTableScanExec arrow bat…
garlandz-db edb9fce
[SPARK-55890] Fix Arrow buffer leak when serialization interrupted by…
garlandz-db 0e23c3b
[SPARK-55890] Move Arrow leak check to JVM shutdown hook after SparkC…
garlandz-db 0da5492
[SPARK-55890] scalafmt: fix while-condition indentation in SimpleSpar…
garlandz-db 2277b12
[SPARK-55890] Hold strong ref to synthetic leak buf to prevent GC-bas…
garlandz-db 7eaa406
[SPARK-55890] Fix synthetic leak for ArrowLeakDetectionE2ETest
garlandz-db e9e243b
[SPARK-55890] Simplify: revert production-only changes, keep test-onl…
garlandz-db 1a9e0c9
Make InternalRowIteratorFromIPCStream.close() idempotent
garlandz-db 2b8d73a
Remove idempotent close guard from InternalRowIteratorFromIPCStream
garlandz-db 310d17e
Remove unnecessary idempotent close guard; use hasNext auto-close in …
garlandz-db 27ba107
Restore fileWriter.close() to write() in try/finally
garlandz-db ee98307
Guard against double-close of fileWriter in SparkArrowFileWriter
garlandz-db 4acd850
Materialize reader.read() before finally in ArrowFileReadWrite.load()
garlandz-db 885f998
Revert: remove speculative reader.read().toArray materialization
garlandz-db 820c4a0
Materialize reader.read() before finally in ArrowFileReadWrite.load()
garlandz-db c7a2613
Close ArrowRecordBatch in try/finally inside write() loop
garlandz-db 7b73ef9
Set fileWriterClosed=true in close() guard
garlandz-db e4dc059
scalafmt: reformat test files
garlandz-db 2b4e820
Address review: throw exception on Arrow leak when not in shutdown hook
garlandz-db 6e38a30
Fix thread safety and teardown correctness in leak check
garlandz-db e0f20d0
Remove halt() from RemoteSparkSession: always throw RuntimeException
garlandz-db df609b7
Restore LocalTableScanExec Arrow leak fix in SparkConnectPlanExecution
garlandz-db 529f101
Revert "Restore LocalTableScanExec Arrow leak fix in SparkConnectPlan…
garlandz-db cb54f77
Fix Arrow iterator leak in LocalTableScanExec via mkBatches refactor
garlandz-db 5ca7cd4
Fix scalastyle: replace em dash with period in comment
garlandz-db 14f0f57
Fix: use type inference for mkBatches to avoid private[sql] cross-mod…
garlandz-db 429a63d
[SPARK-55890] Cancel lingering Spark jobs in clearAllExecutions to fi…
garlandz-db 04ff881
Move Arrow memory check from afterEach to afterAll
garlandz-db 5515359
Remove pre-stop Arrow memory wait; rely on ArrowAllocatorLeakCheck po…
garlandz-db fd960a5
Fix Scalastyle: replace non-ASCII em dash with semicolon in comment
garlandz-db c4810be
Remove unused ArrowUtils import
garlandz-db 5205532
chore: re-trigger CI
garlandz-db af7b49d
connect: register TaskCompletionListener on ArrowBatchWithSchemaItera…
garlandz-db 85d03e4
[SPARK-55890] Fix Arrow allocator leak in SparkConnectPlanExecution
garlandz-db a5265e8
[SPARK-55890] Fix Arrow leak check race: wait for tasks before assert…
garlandz-db 36ba135
test(connect): extend Arrow memory drain timeout to 3 minutes in afte…
garlandz-db f450ace
test(connect): reduce BIG_ENOUGH_QUERY from 1M to 100K rows to avoid …
garlandz-db 6eb3006
test(connect): fix two remaining large-data sources causing Arrow mem…
garlandz-db 2c0adc1
refactor: use converter in sendCollectedRows, delegate LocalTableScan…
garlandz-db File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
94 changes: 94 additions & 0 deletions
94
...ct/client/jvm/src/test/scala/org/apache/spark/sql/connect/ArrowLeakDetectionE2ETest.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.sql.connect | ||
|
|
||
| import java.io.{BufferedReader, File, InputStreamReader} | ||
| import java.lang.ProcessBuilder.Redirect | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite | ||
|
|
||
| import org.apache.spark.sql.connect.test.IntegrationTestUtils | ||
|
|
||
| /** | ||
| * Verifies that SimpleSparkConnectService exits with ArrowLeakExitCode (77) when the Arrow | ||
| * rootAllocator has unreleased memory at shutdown. The server is started with the | ||
| * SPARK_TEST_ARROW_LEAK env variable which injects a synthetic allocation that is never freed. | ||
| */ | ||
| class ArrowLeakDetectionE2ETest extends AnyFunSuite { // scalastyle:ignore funsuite | ||
|
|
||
| test("server exits with code 77 when rootAllocator has a leak") { | ||
| val connectJarOpt = IntegrationTestUtils.tryFindJar( | ||
| "sql/connect/server", | ||
| "spark-connect-assembly", | ||
| "spark-connect") | ||
| assume(connectJarOpt.isDefined, "Skipping: connect server assembly jar not found") | ||
| val connectJar = connectJarOpt.get.getCanonicalPath | ||
|
|
||
| val command = Seq( | ||
| s"${IntegrationTestUtils.sparkHome}/bin/spark-submit", | ||
| "--driver-class-path", | ||
| connectJar, | ||
| "--class", | ||
| "org.apache.spark.sql.connect.SimpleSparkConnectService", | ||
| connectJar) | ||
|
|
||
| val builder = new ProcessBuilder(command: _*) | ||
| builder.directory(new File(IntegrationTestUtils.sparkHome)) | ||
| builder.environment().remove("SPARK_DIST_CLASSPATH") | ||
| // Trigger the synthetic leak in SimpleSparkConnectService | ||
| builder.environment().put("SPARK_TEST_ARROW_LEAK", "1") | ||
| builder.redirectError(Redirect.INHERIT) | ||
| builder.redirectOutput(Redirect.PIPE) | ||
|
|
||
| val process = builder.start() | ||
| val reader = new BufferedReader(new InputStreamReader(process.getInputStream)) | ||
| val consoleOut = process.getOutputStream | ||
|
|
||
| try { | ||
| // Read stdout until "Ready for client connections." so we know the server is up. | ||
| var line = reader.readLine() | ||
| while (line != null && !line.contains("Ready for client connections")) { | ||
| line = reader.readLine() | ||
| } | ||
| assert( | ||
| line != null && line.contains("Ready for client connections"), | ||
| "Server did not print ready message before exiting") | ||
|
|
||
| // Drain remaining stdout in the background to prevent the pipe from filling and blocking | ||
| // the server process. | ||
| val drainThread = new Thread(() => { | ||
| try { while (reader.readLine() != null) {} } | ||
| catch { case _: Exception => } | ||
| }) | ||
| drainThread.setDaemon(true) | ||
| drainThread.start() | ||
|
|
||
| // Send stop command | ||
| consoleOut.write("q\n".getBytes) | ||
| consoleOut.flush() | ||
| consoleOut.close() | ||
|
|
||
| assert(process.waitFor(2, TimeUnit.MINUTES), "Server did not exit within 2 minutes") | ||
| assert( | ||
| process.exitValue() == 77, | ||
| s"Expected exit code 77 (Arrow leak), got ${process.exitValue()}") | ||
| } finally { | ||
| if (process.isAlive) process.destroyForcibly() | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is leaking right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this fix