Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
6d7d90f
Check arrow memory at end of tests
garlandz-db Mar 9, 2026
aceb571
add
garlandz-db Mar 9, 2026
e72b42e
full checks
garlandz-db Mar 9, 2026
30b6eaf
update
garlandz-db Mar 9, 2026
3c43723
Add Arrow rootAllocator leak check in server JVM for E2E tests
garlandz-db Mar 11, 2026
bc4998b
Fix Arrow memory leaks and leak check ordering
garlandz-db Mar 11, 2026
00a45d9
Restore fileWriter.close() inside SparkArrowFileWriter.write()
garlandz-db Mar 11, 2026
5f5a8f3
Fix Arrow allocator leak in LocalTableScanExec path of processAsArrow…
garlandz-db Mar 11, 2026
ed06ce9
Apply scalafmt formatting to SparkConnectPlannerSuite and SparkConnec…
garlandz-db Mar 11, 2026
a131dab
Add ArrowAllocatorLeakCheck to ArrowFileReadWriteSuite and improve sc…
garlandz-db Mar 11, 2026
7f70170
Fix Arrow allocator leak in LocalTableScanExec when sendBatch throws
garlandz-db Mar 12, 2026
7d25547
[SPARK-55890] Wait for execution threads in clearAllExecutions
garlandz-db Mar 12, 2026
5704e57
[SPARK-55890] Re-trigger CI (infra flake: actions/setup-java 401)
garlandz-db Mar 12, 2026
29bdb39
[SPARK-55890] Re-trigger CI (second attempt, infra flake resolved)
garlandz-db Mar 12, 2026
2a0a62e
[SPARK-55890] Honour thread interrupt in LocalTableScanExec arrow bat…
garlandz-db Mar 12, 2026
edb9fce
[SPARK-55890] Fix Arrow buffer leak when serialization interrupted by…
garlandz-db Mar 12, 2026
0e23c3b
[SPARK-55890] Move Arrow leak check to JVM shutdown hook after SparkC…
garlandz-db Mar 13, 2026
0da5492
[SPARK-55890] scalafmt: fix while-condition indentation in SimpleSpar…
garlandz-db Mar 13, 2026
2277b12
[SPARK-55890] Hold strong ref to synthetic leak buf to prevent GC-bas…
garlandz-db Mar 13, 2026
7eaa406
[SPARK-55890] Fix synthetic leak for ArrowLeakDetectionE2ETest
garlandz-db Mar 13, 2026
e9e243b
[SPARK-55890] Simplify: revert production-only changes, keep test-onl…
garlandz-db Mar 18, 2026
1a9e0c9
Make InternalRowIteratorFromIPCStream.close() idempotent
garlandz-db Mar 18, 2026
2b8d73a
Remove idempotent close guard from InternalRowIteratorFromIPCStream
garlandz-db Mar 19, 2026
310d17e
Remove unnecessary idempotent close guard; use hasNext auto-close in …
garlandz-db Mar 19, 2026
27ba107
Restore fileWriter.close() to write() in try/finally
garlandz-db Mar 19, 2026
ee98307
Guard against double-close of fileWriter in SparkArrowFileWriter
garlandz-db Mar 19, 2026
4acd850
Materialize reader.read() before finally in ArrowFileReadWrite.load()
garlandz-db Mar 19, 2026
885f998
Revert: remove speculative reader.read().toArray materialization
garlandz-db Mar 19, 2026
820c4a0
Materialize reader.read() before finally in ArrowFileReadWrite.load()
garlandz-db Mar 19, 2026
c7a2613
Close ArrowRecordBatch in try/finally inside write() loop
garlandz-db Mar 19, 2026
7b73ef9
Set fileWriterClosed=true in close() guard
garlandz-db Mar 19, 2026
e4dc059
scalafmt: reformat test files
garlandz-db Mar 19, 2026
2b4e820
Address review: throw exception on Arrow leak when not in shutdown hook
garlandz-db Mar 19, 2026
6e38a30
Fix thread safety and teardown correctness in leak check
garlandz-db Mar 19, 2026
e0f20d0
Remove halt() from RemoteSparkSession: always throw RuntimeException
garlandz-db Mar 19, 2026
df609b7
Restore LocalTableScanExec Arrow leak fix in SparkConnectPlanExecution
garlandz-db Mar 19, 2026
529f101
Revert "Restore LocalTableScanExec Arrow leak fix in SparkConnectPlan…
garlandz-db Mar 19, 2026
cb54f77
Fix Arrow iterator leak in LocalTableScanExec via mkBatches refactor
garlandz-db Mar 19, 2026
5ca7cd4
Fix scalastyle: replace em dash with period in comment
garlandz-db Mar 19, 2026
14f0f57
Fix: use type inference for mkBatches to avoid private[sql] cross-mod…
garlandz-db Mar 19, 2026
429a63d
[SPARK-55890] Cancel lingering Spark jobs in clearAllExecutions to fi…
garlandz-db Mar 20, 2026
04ff881
Move Arrow memory check from afterEach to afterAll
garlandz-db Mar 20, 2026
5515359
Remove pre-stop Arrow memory wait; rely on ArrowAllocatorLeakCheck po…
garlandz-db Mar 20, 2026
fd960a5
Fix Scalastyle: replace non-ASCII em dash with semicolon in comment
garlandz-db Mar 20, 2026
c4810be
Remove unused ArrowUtils import
garlandz-db Mar 20, 2026
5205532
chore: re-trigger CI
garlandz-db Mar 23, 2026
af7b49d
connect: register TaskCompletionListener on ArrowBatchWithSchemaItera…
garlandz-db Mar 23, 2026
85d03e4
[SPARK-55890] Fix Arrow allocator leak in SparkConnectPlanExecution
garlandz-db Mar 24, 2026
a5265e8
[SPARK-55890] Fix Arrow leak check race: wait for tasks before assert…
garlandz-db Mar 24, 2026
36ba135
test(connect): extend Arrow memory drain timeout to 3 minutes in afte…
garlandz-db Mar 24, 2026
f450ace
test(connect): reduce BIG_ENOUGH_QUERY from 1M to 100K rows to avoid …
garlandz-db Mar 24, 2026
6eb3006
test(connect): fix two remaining large-data sources causing Arrow mem…
garlandz-db Mar 24, 2026
2c0adc1
refactor: use converter in sendCollectedRows, delegate LocalTableScan…
garlandz-db Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ jobs:
python-version: '3.12'
- name: Install dependencies for Python CodeGen check
run: |
python3.12 -m pip install 'black==26.3.1' 'protobuf==6.33.5' 'mypy==1.8.0' 'mypy-protobuf==3.3.0'
python3.12 -m pip install 'ruff==0.14.8' 'protobuf==6.33.5' 'mypy==1.8.0' 'mypy-protobuf==3.3.0'
python3.12 -m pip list
- name: Python CodeGen check for branch-3.5
if: inputs.branch == 'branch-3.5'
Expand Down
2 changes: 1 addition & 1 deletion dev/gen-protos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ for f in `find gen/proto/python -name "*.py*"`; do
rm $f.bak
done

black --config $SPARK_HOME/pyproject.toml gen/proto/python
ruff format --config $SPARK_HOME/pyproject.toml gen/proto/python

# Last step copy the result files to the destination module.
for f in `find gen/proto/python -name "*.py*"`; do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect

import java.io.{BufferedReader, File, InputStreamReader}
import java.lang.ProcessBuilder.Redirect
import java.util.concurrent.TimeUnit

import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.sql.connect.test.IntegrationTestUtils

/**
* Verifies that SimpleSparkConnectService exits with ArrowLeakExitCode (77) when the Arrow
* rootAllocator has unreleased memory at shutdown. The server is started with the
* SPARK_TEST_ARROW_LEAK env variable which injects a synthetic allocation that is never freed.
*/
class ArrowLeakDetectionE2ETest extends AnyFunSuite { // scalastyle:ignore funsuite

test("server exits with code 77 when rootAllocator has a leak") {
val connectJarOpt = IntegrationTestUtils.tryFindJar(
"sql/connect/server",
"spark-connect-assembly",
"spark-connect")
assume(connectJarOpt.isDefined, "Skipping: connect server assembly jar not found")
val connectJar = connectJarOpt.get.getCanonicalPath

val command = Seq(
s"${IntegrationTestUtils.sparkHome}/bin/spark-submit",
"--driver-class-path",
connectJar,
"--class",
"org.apache.spark.sql.connect.SimpleSparkConnectService",
connectJar)

val builder = new ProcessBuilder(command: _*)
builder.directory(new File(IntegrationTestUtils.sparkHome))
builder.environment().remove("SPARK_DIST_CLASSPATH")
// Trigger the synthetic leak in SimpleSparkConnectService
builder.environment().put("SPARK_TEST_ARROW_LEAK", "1")
builder.redirectError(Redirect.INHERIT)
builder.redirectOutput(Redirect.PIPE)

val process = builder.start()
val reader = new BufferedReader(new InputStreamReader(process.getInputStream))
val consoleOut = process.getOutputStream

try {
// Read stdout until "Ready for client connections." so we know the server is up.
var line = reader.readLine()
while (line != null && !line.contains("Ready for client connections")) {
line = reader.readLine()
}
assert(
line != null && line.contains("Ready for client connections"),
"Server did not print ready message before exiting")

// Drain remaining stdout in the background to prevent the pipe from filling and blocking
// the server process.
val drainThread = new Thread(() => {
try { while (reader.readLine() != null) {} }
catch { case _: Exception => }
})
drainThread.setDaemon(true)
drainThread.start()

// Send stop command
consoleOut.write("q\n".getBytes)
consoleOut.flush()
consoleOut.close()

assert(process.waitFor(2, TimeUnit.MINUTES), "Server did not exit within 2 minutes")
assert(
process.exitValue() == 77,
s"Expected exit code 77 (Arrow leak), got ${process.exitValue()}")
} finally {
if (process.isAlive) process.destroyForcibly()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ object SparkConnectServerUtils {
// The equivalent command to start the connect server via command line:
// bin/spark-shell --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin

// Must match ArrowLeakExitCode in SimpleSparkConnectService
private val ArrowLeakExitCode = 77

// Server port
val port: Int =
ConnectCommon.CONNECT_GRPC_BINDING_PORT + util.Random.nextInt(1000)
Expand Down Expand Up @@ -156,6 +159,10 @@ object SparkConnectServerUtils {
}
val code = sparkConnect.exitValue()
debug(s"Spark Connect Server is stopped with exit code: $code")
if (code == ArrowLeakExitCode) {
throw new RuntimeException(
s"Arrow allocator memory leak detected in Spark Connect server (exit code $code)")
}
code
} catch {
case e: IOException if e.getMessage.contains("Stream closed") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.service.SparkConnectService
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.ShutdownHookManager

/**
* A simple main class method to start the spark connect server as a service for client tests
Expand All @@ -37,6 +39,12 @@ import org.apache.spark.sql.internal.SQLConf
*/
private[sql] object SimpleSparkConnectService {
private val stopCommand = "q"
private val ArrowLeakExitCode = 77

// Holds a synthetic Arrow buffer for ArrowLeakDetectionE2ETest. Stored as an object-level
// field (not a local variable) so the JVM cannot reclaim it before the shutdown hook checks
// getAllocatedMemory. Non-null only when SPARK_TEST_ARROW_LEAK is set.
@volatile private var testLeakBuf: AnyRef = null

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
Expand All @@ -45,6 +53,32 @@ private[sql] object SimpleSparkConnectService {
.set(SQLConf.ARTIFACTS_SESSION_ISOLATION_ALWAYS_APPLY_CLASSLOADER, true)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext // init spark context

// Arrow leak check registered at priority 10 (below SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50)
// so it runs after SparkContext has fully stopped. Polls up to 2 minutes rather than checking
// immediately: execution threads stop accepting new work once the gRPC server shuts down but
// may still be flushing their last Arrow batch, and we should not force-close them.
// halt() is used instead of exit() because exit() deadlocks inside a shutdown hook.
//
// If testLeakBuf is set (ArrowLeakDetectionE2ETest only), the allocation is already present
// in the rootAllocator; skip the wait and check immediately.
ShutdownHookManager.addShutdownHook(10) { () =>
if (testLeakBuf == null) {
val deadline = System.currentTimeMillis() + 2 * 60 * 1000L
while (ArrowUtils.rootAllocator.getAllocatedMemory != 0 &&
System.currentTimeMillis() < deadline) {
Thread.sleep(100)
}
}
val leaked = ArrowUtils.rootAllocator.getAllocatedMemory
if (leaked != 0) {
// scalastyle:off println
println(s"Arrow rootAllocator memory leak detected: $leaked bytes still allocated")
// scalastyle:on println
Runtime.getRuntime.halt(ArrowLeakExitCode)
}
}

// scalastyle:off println
println("Ready for client connections.")
// scalastyle:on println
Expand All @@ -57,7 +91,14 @@ private[sql] object SimpleSparkConnectService {
// Wait for 1 min for the server to stop
SparkConnectService.stop(Some(1), Some(TimeUnit.MINUTES))
sparkSession.close()
exit(0)
// Synthetic leak for ArrowLeakDetectionE2ETest only. Injected here, after
// SparkContext has stopped, so that it is visible to the shutdown hook's
// getAllocatedMemory check. The object-level field keeps it reachable.
if (sys.env.contains("SPARK_TEST_ARROW_LEAK")) {
val leakyAllocator = ArrowUtils.rootAllocator.newChildAllocator("test-leak", 0, 1024)
testLeakBuf = leakyAllocator.buffer(64) // intentionally never closed
}
exit(0) // triggers shutdown hooks; Arrow leak check runs in the hook registered above
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,23 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
// Whether to enable arrow batch chunking for large result batches.
val isResultChunkingEnabled = executePlan.resultChunkingEnabled

val converter = rowToArrowConverter(
schema,
maxRecordsPerBatch,
maxBatchSize,
timeZoneId,
errorOnDuplicatedFieldNames = false,
largeVarTypes = largeVarTypes)
val mkBatches = (rows: Iterator[InternalRow]) =>
ArrowConverters.toBatchWithSchemaIterator(
rows,
schema,
maxRecordsPerBatch,
maxBatchSize,
timeZoneId,
errorOnDuplicatedFieldNames = false,
largeVarTypes = largeVarTypes)

// toBatchWithSchemaIterator passes TaskContext.get() to ArrowBatchWithSchemaIterator,
// which already registers a TaskCompletionListener to close itself on task completion.
// No additional TCL is needed here.
val converter: Iterator[InternalRow] => Iterator[Batch] = rows => {
val batches = mkBatches(rows)
batches.map(b => b -> batches.rowCountInLastBatch)
}

var numSent = 0
def sendBatch(bytes: Array[Byte], count: Long, startOffset: Long): Unit = {
Expand Down Expand Up @@ -207,19 +217,21 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
def sendCollectedRows(rows: Array[InternalRow]): Unit = {
executePlan.eventsManager.postFinished(Some(rows.length))
var offset = 0L
converter(rows.iterator).foreach { case (bytes, count) =>
sendBatch(bytes, count, offset)
offset += count
val batches = mkBatches(rows.iterator)
try {
while (batches.hasNext) {
val batchBytes = batches.next()
val count = batches.rowCountInLastBatch
sendBatch(batchBytes, count, offset)
offset += count
}
} finally {
batches.close()
}
}
dataframe.queryExecution.executedPlan match {
case LocalTableScanExec(_, rows, _) =>
executePlan.eventsManager.postFinished(Some(rows.length))
var offset = 0L
converter(rows.iterator).foreach { case (bytes, count) =>
sendBatch(bytes, count, offset)
offset += count
}
sendCollectedRows(rows.toArray)
case collectLimit: CollectLimitExec =>
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectLimitArrow")) {
sendCollectedRows(collectLimit.executeCollect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.plans._
import org.apache.spark.sql.connect.service.{ExecuteHolder, SessionKey, SparkConnectService}
import org.apache.spark.sql.execution.arrow.ArrowAllocatorLeakCheck
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CloseableIterator
import org.apache.spark.sql.util.{ArrowUtils, CloseableIterator}

/**
* Base class and utilities for a test suite that starts and tests the real SparkConnectService
* with a real SparkConnectClient, communicating over RPC, but both in-process.
*/
trait SparkConnectServerTest extends SharedSparkSession {
trait SparkConnectServerTest extends SharedSparkSession with ArrowAllocatorLeakCheck {

// Server port
val serverPort: Int =
Expand All @@ -67,6 +68,19 @@ trait SparkConnectServerTest extends SharedSparkSession {

override def afterAll(): Unit = {
SparkConnectService.stop()
spark.sparkContext.cancelAllJobs()
// Executor.stop() calls threadPool.shutdown() but does NOT await termination, so task
// threads may still be running (and holding Arrow child-allocator memory) when
// SparkContext.stop() returns. Wait here for all Arrow memory to drain before the
// ArrowAllocatorLeakCheck assertion fires in super.afterAll(). Use a 3-minute timeout
// (instead of the default 30 seconds) to accommodate slow CI runners where Arrow
// serialization of a large result set can take well over 30 seconds.
Eventually.eventually(timeout(3.minutes), interval(500.millis)) {
assert(
ArrowUtils.rootAllocator.getAllocatedMemory == 0,
s"Arrow memory still allocated after cancelAllJobs: " +
s"${ArrowUtils.rootAllocator.getAllocatedMemory} bytes")
}
allocator.close()
super.afterAll()
}
Expand All @@ -82,7 +96,19 @@ trait SparkConnectServerTest extends SharedSparkSession {
}

protected def clearAllExecutions(): Unit = {
SparkConnectService.executionManager.listExecuteHolders.foreach(_.close())
val holders = SparkConnectService.executionManager.listExecuteHolders
holders.foreach(_.close())
// Wait for execution threads to terminate so that Arrow allocators they hold are released
// before the ArrowAllocatorLeakCheck runs in afterAll().
holders.foreach { holder =>
eventuallyWithTimeout { assert(!holder.isExecuteThreadRunnerAlive()) }
}
// Cancel any Spark jobs still running after the execute threads exited. In local mode the
// executor and driver share the same JVM, so a job submitted by an execute thread may still
// have tasks running (and holding ArrowBatchWithSchemaIterator allocations) even after the
// execute thread itself terminates. cancelAllJobs() signals cancellation; the afterAll()
// wait for getAllocatedMemory == 0 ensures TCLs have actually fired before the check.
spark.sparkContext.cancelAllJobs()
SparkConnectService.executionManager.periodicMaintenance(0)
SparkConnectService.sessionManager.invalidateAllSessions()
assertNoActiveExecutions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class SparkConnectServerTestSuite extends SparkConnectServerTest {

test("eventuallyGetExecutionHolder: retrieves active execution") {
withRawBlockingStub { stub =>
val request = buildExecutePlanRequest(buildPlan("SELECT * FROM range(1000000)"))
val request = buildExecutePlanRequest(buildPlan("SELECT * FROM range(100000)"))
val iter = stub.executePlan(request)
iter.hasNext // trigger execution

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.classic.Dataset
import org.apache.spark.sql.connect.SparkConnectTestUtils
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.execution.arrow.{ArrowAllocatorLeakCheck, ArrowConverters}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimeType}
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String
* Testing trait for SparkConnect tests with some helper methods to make it easier to create new
* test cases.
*/
trait SparkConnectPlanTest extends SharedSparkSession {
trait SparkConnectPlanTest extends SharedSparkSession with ArrowAllocatorLeakCheck {

def transform(rel: proto.Relation): logical.LogicalPlan = {
SparkConnectPlannerTestUtils.transform(spark, rel)
}
Expand Down Expand Up @@ -95,7 +96,7 @@ trait SparkConnectPlanTest extends SharedSparkSession {
schema: Option[StructType] = None): proto.Relation = {
val localRelationBuilder = proto.LocalRelation.newBuilder()

val bytes = ArrowConverters
val iter = ArrowConverters
.toBatchWithSchemaIterator(
data.iterator,
DataTypeUtils.fromAttributes(attrs.map(_.toAttribute)),
Expand All @@ -104,7 +105,12 @@ trait SparkConnectPlanTest extends SharedSparkSession {
timeZoneId,
true,
false)
.next()
val bytes =
try {
iter.next()
} finally {
iter.close()
}

localRelationBuilder.setData(ByteString.copyFrom(bytes))
schema.foreach(s => localRelationBuilder.setSchema(s.json))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
val localRelationBuilder = proto.LocalRelation.newBuilder()

val attributes = attrs.map(exp => AttributeReference(exp.name, exp.dataType)())
val buffer = ArrowConverters
val iter = ArrowConverters
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is leaking right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this fix

 SparkConnectProtoSuite:
     [info] org.apache.spark.sql.connect.planner.SparkConnectProtoSuite *** ABORTED *** (8 seconds, 710 milliseconds)
     [info]   16896 did not equal 0 Arrow rootAllocator memory leak: 16896 bytes still allocated
     (ArrowAllocatorLeakCheck.scala:33)
     [info]   org.scalatest.exceptions.TestFailedException:
     [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
     [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
     [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
     [info]   at org.scalatest.Assertio

.toBatchWithSchemaIterator(
Iterator.empty,
DataTypeUtils.fromAttributes(attributes),
Expand All @@ -1127,7 +1127,12 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
null,
true,
false)
.next()
val buffer =
try {
iter.next()
} finally {
iter.close()
}
proto.Relation
.newBuilder()
.setLocalRelation(localRelationBuilder.setData(ByteString.copyFrom(buffer)).build())
Expand Down
Loading