Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
public abstract class BufferedRowIterator {
protected LinkedList<InternalRow> currentRows = new LinkedList<>();
// used when there is no column in output
protected UnsafeRow unsafeRow = new UnsafeRow(0);
// Keep it public for codegen to access.
public UnsafeRow unsafeRow = new UnsafeRow(0);
private long startTimeNs = System.nanoTime();

protected int partitionIndex = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,29 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
}
}

test("SPARK-56134: Codegen working for empty output") {
// Create a balanced tree of AND conditions. This prevents generating a very deep tree,
// which can cause stack overflow.
def balancedAnd(cols: Seq[String]): String = cols match {
case Seq(single) => single
case seq =>
val (left, right) = seq.splitAt(seq.length / 2)
balancedAnd(left) + " and " + balancedAnd(right)
}

withTempPath { dir =>
val path = dir.getCanonicalPath
sql("select array(0) as value from range(0, 1, 1, 1)")
.write.mode(SaveMode.Overwrite).parquet(path)

val numConditions = 1000
val conditions = (0 until numConditions).map(i => s"value <= array($i)")
val condition = balancedAnd(conditions)
val df = spark.read.parquet(path).filter(condition).selectExpr()
assert(df.limit(1).selectExpr("count(*)").collect() === Array(Row(1)))
}
}

test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") {
val a = Seq(1).toDF("key")
val b = Seq((1, "a")).toDF("key", "value")
Expand Down