From 01fe84c6b71190670a6448d535ae21b4fe510ccf Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 23 Mar 2026 18:01:30 +0800 Subject: [PATCH 1/7] nit --- .../org/apache/spark/sql/QueryTest.scala | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index ac406a9fa694e..0e358dda388ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,7 +23,9 @@ import java.util.regex.Pattern import scala.jdk.CollectionConverters._ import org.scalatest.Assertions +import org.scalatest.Suite +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ @@ -35,7 +37,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.ArrayImplicits._ -abstract class QueryTest extends PlanTest with SparkSessionProvider { +trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite => /** * Runs the plan and makes sure the answer contains all of the keywords. @@ -202,7 +204,8 @@ abstract class QueryTest extends PlanTest with SparkSessionProvider { * Asserts that a given [[Dataset]] will be executed using the given number of cached results. */ def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = { - val planWithCaching = query.queryExecution.withCachedData + val planWithCaching = + query.asInstanceOf[classic.Dataset[_]].queryExecution.withCachedData val cachedData = planWithCaching collect { case cached: InMemoryRelation => cached } @@ -218,7 +221,8 @@ abstract class QueryTest extends PlanTest with SparkSessionProvider { * storage level. */ def assertCached(query: Dataset[_], cachedName: String, storageLevel: StorageLevel): Unit = { - val planWithCaching = query.queryExecution.withCachedData + val planWithCaching = + query.asInstanceOf[classic.Dataset[_]].queryExecution.withCachedData val matched = planWithCaching.exists { case cached: InMemoryRelation => val cacheBuilder = cached.cacheBuilder @@ -238,12 +242,13 @@ abstract class QueryTest extends PlanTest with SparkSessionProvider { * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. */ def assertEmptyMissingInput(query: Dataset[_]): Unit = { - assert(query.queryExecution.analyzed.missingInput.isEmpty, - s"The analyzed logical plan has missing inputs:\n${query.queryExecution.analyzed}") - assert(query.queryExecution.optimizedPlan.missingInput.isEmpty, - s"The optimized logical plan has missing inputs:\n${query.queryExecution.optimizedPlan}") - assert(query.queryExecution.executedPlan.missingInput.isEmpty, - s"The physical plan has missing inputs:\n${query.queryExecution.executedPlan}") + val qe = query.asInstanceOf[classic.Dataset[_]].queryExecution + assert(qe.analyzed.missingInput.isEmpty, + s"The analyzed logical plan has missing inputs:\n${qe.analyzed}") + assert(qe.optimizedPlan.missingInput.isEmpty, + s"The optimized logical plan has missing inputs:\n${qe.optimizedPlan}") + assert(qe.executedPlan.missingInput.isEmpty, + s"The physical plan has missing inputs:\n${qe.executedPlan}") } protected def getCurrentClassCallSitePattern: String = { @@ -258,6 +263,8 @@ abstract class QueryTest extends PlanTest with SparkSessionProvider { } } +abstract class QueryTest extends SparkFunSuite with QueryTestBase + object QueryTest extends Assertions { /** * Runs the plan and makes sure the answer matches the expected result. @@ -434,7 +441,7 @@ object QueryTest extends Assertions { * @param expectedAnswer the expected result in a[[Row]]. * @param absTol the absolute tolerance between actual and expected answers. */ - protected def checkAggregatesWithTol(actualAnswer: Row, expectedAnswer: Row, absTol: Double) = { + def checkAggregatesWithTol(actualAnswer: Row, expectedAnswer: Row, absTol: Double): Unit = { require(actualAnswer.length == expectedAnswer.length, s"actual answer length ${actualAnswer.length} != " + s"expected answer length ${expectedAnswer.length}") @@ -469,13 +476,14 @@ object QueryTest extends Assertions { } } - spark.sparkContext.listenerBus.waitUntilEmpty(15000) - spark.listenerManager.register(listener) + val classicSession = spark.asInstanceOf[classic.SparkSession] + classicSession.sparkContext.listenerBus.waitUntilEmpty(15000) + classicSession.listenerManager.register(listener) try { thunk - spark.sparkContext.listenerBus.waitUntilEmpty(15000) + classicSession.sparkContext.listenerBus.waitUntilEmpty(15000) } finally { - spark.listenerManager.unregister(listener) + classicSession.listenerManager.unregister(listener) } capturedQueryExecutions From e1dc71c076b90b84b867a0c81439510e241fa699 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 23 Mar 2026 20:44:39 +0800 Subject: [PATCH 2/7] nit --- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 0e358dda388ab..4773486cb5cbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -251,6 +251,10 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite s"The physical plan has missing inputs:\n${qe.executedPlan}") } +} + +abstract class QueryTest extends SparkFunSuite with QueryTestBase { + protected def getCurrentClassCallSitePattern: String = { val cs = Thread.currentThread().getStackTrace()(2) s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" @@ -263,8 +267,6 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite } } -abstract class QueryTest extends SparkFunSuite with QueryTestBase - object QueryTest extends Assertions { /** * Runs the plan and makes sure the answer matches the expected result. From fc806d78fa2fadb8aded11a6ec822ecfcf87f2a7 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 01:52:37 +0000 Subject: [PATCH 3/7] Move getCurrentClassCallSitePattern from QueryTest to QueryTestBase Co-authored-by: Isaac --- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 4773486cb5cbf..72a8891ea153a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -251,15 +251,15 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite s"The physical plan has missing inputs:\n${qe.executedPlan}") } -} - -abstract class QueryTest extends SparkFunSuite with QueryTestBase { - protected def getCurrentClassCallSitePattern: String = { val cs = Thread.currentThread().getStackTrace()(2) s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } +} + +abstract class QueryTest extends SparkFunSuite with QueryTestBase { + protected def getNextLineCallSitePattern(lines: Int = 1): String = { val cs = Thread.currentThread().getStackTrace()(2) Pattern.quote( From 64b76e98b0b707456f045fa473ede884db602284 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 01:55:28 +0000 Subject: [PATCH 4/7] Move getNextLineCallSitePattern from QueryTest to QueryTestBase Co-authored-by: Isaac --- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 72a8891ea153a..3fb051bb86fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -256,15 +256,15 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } -} - -abstract class QueryTest extends SparkFunSuite with QueryTestBase { - protected def getNextLineCallSitePattern(lines: Int = 1): String = { val cs = Thread.currentThread().getStackTrace()(2) Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") } + +} + +abstract class QueryTest extends SparkFunSuite with QueryTestBase { } object QueryTest extends Assertions { From 63daaa7808a81900cf3b99c74c9c96b2996f3915 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 01:56:27 +0000 Subject: [PATCH 5/7] nit Co-authored-by: Isaac --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 3fb051bb86fa0..1ea20c079f64a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -264,8 +264,7 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite } -abstract class QueryTest extends SparkFunSuite with QueryTestBase { -} +abstract class QueryTest extends SparkFunSuite with QueryTestBase object QueryTest extends Assertions { /** From 64414cecbde1b31c0aeb78f48e9c2b0958ae9176 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 05:56:49 +0000 Subject: [PATCH 6/7] Fix stack frame depth for trait methods in getCurrentClassCallSitePattern and getNextLineCallSitePattern Walk the stack past any forwarder frames instead of using a hardcoded index, since trait method dispatch may add intermediate frames. Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/QueryTest.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 1ea20c079f64a..a76015a148928 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -252,12 +252,18 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite } protected def getCurrentClassCallSitePattern: String = { - val cs = Thread.currentThread().getStackTrace()(2) + val cs = Thread.currentThread().getStackTrace + .drop(1) + .dropWhile(_.getMethodName == "getCurrentClassCallSitePattern") + .head s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } protected def getNextLineCallSitePattern(lines: Int = 1): String = { - val cs = Thread.currentThread().getStackTrace()(2) + val cs = Thread.currentThread().getStackTrace + .drop(1) + .dropWhile(_.getMethodName == "getNextLineCallSitePattern") + .head Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") } From ff205674e3384dc8cb4b00a607554093992acdb9 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 24 Mar 2026 06:03:21 +0000 Subject: [PATCH 7/7] Move getCurrentClassCallSitePattern and getNextLineCallSitePattern back to QueryTest These methods rely on a hardcoded stack frame index that is sensitive to trait mixin forwarders, so they need to stay in the abstract class. Co-authored-by: Isaac --- .../scala/org/apache/spark/sql/QueryTest.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index a76015a148928..4773486cb5cbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -251,27 +251,22 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite s"The physical plan has missing inputs:\n${qe.executedPlan}") } +} + +abstract class QueryTest extends SparkFunSuite with QueryTestBase { + protected def getCurrentClassCallSitePattern: String = { - val cs = Thread.currentThread().getStackTrace - .drop(1) - .dropWhile(_.getMethodName == "getCurrentClassCallSitePattern") - .head + val cs = Thread.currentThread().getStackTrace()(2) s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } protected def getNextLineCallSitePattern(lines: Int = 1): String = { - val cs = Thread.currentThread().getStackTrace - .drop(1) - .dropWhile(_.getMethodName == "getNextLineCallSitePattern") - .head + val cs = Thread.currentThread().getStackTrace()(2) Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") } - } -abstract class QueryTest extends SparkFunSuite with QueryTestBase - object QueryTest extends Assertions { /** * Runs the plan and makes sure the answer matches the expected result.