From 97d2cf18ac0ed4ec7e82ace04003044c883018cd Mon Sep 17 00:00:00 2001 From: Chenhao Li Date: Sun, 22 Mar 2026 19:11:24 -0700 Subject: [PATCH] initial --- .../sql/execution/BufferedRowIterator.java | 3 ++- .../execution/WholeStageCodegenSuite.scala | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 3d0511b7ba838..52357acf3c7d6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -33,7 +33,8 @@ public abstract class BufferedRowIterator { protected LinkedList currentRows = new LinkedList<>(); // used when there is no column in output - protected UnsafeRow unsafeRow = new UnsafeRow(0); + // Keep it public for codegen to access. + public UnsafeRow unsafeRow = new UnsafeRow(0); private long startTimeNs = System.nanoTime(); protected int partitionIndex = -1; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 6004f6f76394f..cf05132de84fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -773,6 +773,29 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } } + test("SPARK-56134: Codegen working for empty output") { + // Create a balanced tree of AND conditions. This prevents generating a very deep tree, + // which can cause stack overflow. + def balancedAnd(cols: Seq[String]): String = cols match { + case Seq(single) => single + case seq => + val (left, right) = seq.splitAt(seq.length / 2) + balancedAnd(left) + " and " + balancedAnd(right) + } + + withTempPath { dir => + val path = dir.getCanonicalPath + sql("select array(0) as value from range(0, 1, 1, 1)") + .write.mode(SaveMode.Overwrite).parquet(path) + + val numConditions = 1000 + val conditions = (0 until numConditions).map(i => s"value <= array($i)") + val condition = balancedAnd(conditions) + val df = spark.read.parquet(path).filter(condition).selectExpr() + assert(df.limit(1).selectExpr("count(*)").collect() === Array(Row(1))) + } + } + test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") { val a = Seq(1).toDF("key") val b = Seq((1, "a")).toDF("key", "value")