Skip to content

Commit 8420705

Browse files
baibaichenCopilot
andcommitted
[GLUTEN-11550][UT] Add testGluten for PythonDataSourceSuite filter pushdown
Gluten replaces FilterExec with FilterExecTransformer and BatchScanExec with BatchScanExecTransformer. Add testGluten matching the Gluten operator names. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 29480f8 commit 8420705

4 files changed

Lines changed: 112 additions & 4 deletions

File tree

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,7 @@ class VeloxTestSettings extends BackendTestSettings {
11391139
enableSuite[GlutenSQLCollectLimitExecSuite]
11401140
// Generated suites for org.apache.spark.sql.execution.python
11411141
enableSuite[GlutenPythonDataSourceSuite]
1142+
.exclude("SPARK-50426: should not trigger static Python data source lookup")
11421143
enableSuite[GlutenPythonUDFSuite]
11431144
.exclude("SPARK-48706: Negative test case for Python UDF in higher order functions")
11441145
enableSuite[GlutenPythonUDTFSuite]

gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,36 @@
1616
*/
1717
package org.apache.spark.sql.execution.python
1818

19-
import org.apache.spark.sql.GlutenSQLTestsTrait
19+
import org.apache.spark.sql.{GlutenSQLTestsTrait, IntegratedUDFTestUtils}
20+
import org.apache.spark.sql.execution.datasources.DataSourceManager
2021

21-
class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {}
22+
class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {
23+
24+
import IntegratedUDFTestUtils._
25+
26+
// In Gluten's single-JVM test runner, DataSourceManager.dataSourceBuilders (a mutable
27+
// static var on the companion object) may already be populated by earlier suites.
28+
// Reset it before the test so the log fires again.
29+
testGluten("SPARK-50426: should not trigger static Python data source lookup") {
30+
assume(shouldTestPandasUDFs)
31+
DataSourceManager.dataSourceBuilders = None
32+
val testAppender = new LogAppender("Python data source lookup")
33+
withLogAppender(testAppender) {
34+
spark.read.format("org.apache.spark.sql.test").load()
35+
spark.range(3).write.mode("overwrite").format("noop").save()
36+
}
37+
assert(
38+
!testAppender.loggingEvents
39+
.exists(
40+
msg =>
41+
msg.getMessage.getFormattedMessage.contains("Loading static Python Data Sources.")))
42+
withLogAppender(testAppender) {
43+
spark.read.format(staticSourceName).load()
44+
}
45+
assert(
46+
testAppender.loggingEvents
47+
.exists(
48+
msg =>
49+
msg.getMessage.getFormattedMessage.contains("Loading static Python Data Sources.")))
50+
}
51+
}

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,7 @@ class VeloxTestSettings extends BackendTestSettings {
11451145
enableSuite[GlutenSQLCollectLimitExecSuite]
11461146
// Generated suites for org.apache.spark.sql.execution.python
11471147
enableSuite[GlutenPythonDataSourceSuite]
1148+
.exclude("data source reader with filter pushdown")
11481149
enableSuite[GlutenPythonUDFSuite]
11491150
.exclude("SPARK-48706: Negative test case for Python UDF in higher order functions")
11501151
enableSuite[GlutenPythonUDTFSuite]

gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,82 @@
1616
*/
1717
package org.apache.spark.sql.execution.python
1818

19-
import org.apache.spark.sql.GlutenSQLTestsTrait
19+
import org.apache.gluten.execution.FilterExecTransformerBase
2020

21-
class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {}
21+
import org.apache.spark.sql.{GlutenSQLTestsTrait, IntegratedUDFTestUtils, Row}
22+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
23+
import org.apache.spark.sql.execution.datasources.v2.python.PythonScan
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.types.StructType
26+
27+
class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {
28+
29+
import IntegratedUDFTestUtils._
30+
31+
// Gluten replaces FilterExec with FilterExecTransformer and
32+
// BatchScanExec with BatchScanExecTransformer
33+
testGluten("data source reader with filter pushdown") {
34+
assume(shouldTestPandasUDFs)
35+
val dataSourceScript =
36+
s"""
37+
|from pyspark.sql.datasource import (
38+
| DataSource,
39+
| DataSourceReader,
40+
| EqualTo,
41+
| InputPartition,
42+
|)
43+
|
44+
|class SimpleDataSourceReader(DataSourceReader):
45+
| def partitions(self):
46+
| return [InputPartition(i) for i in range(2)]
47+
|
48+
| def pushFilters(self, filters):
49+
| for filter in filters:
50+
| if filter != EqualTo(("partition",), 0):
51+
| yield filter
52+
|
53+
| def read(self, partition):
54+
| yield (0, partition.value)
55+
| yield (1, partition.value)
56+
| yield (2, partition.value)
57+
|
58+
|class SimpleDataSource(DataSource):
59+
| def schema(self):
60+
| return "id int, partition int"
61+
|
62+
| def reader(self, schema):
63+
| return SimpleDataSourceReader()
64+
|""".stripMargin
65+
val schema = StructType.fromDDL("id INT, partition INT")
66+
val dataSource =
67+
createUserDefinedPythonDataSource(name = dataSourceName, pythonScript = dataSourceScript)
68+
withSQLConf(SQLConf.PYTHON_FILTER_PUSHDOWN_ENABLED.key -> "true") {
69+
spark.dataSource.registerPython(dataSourceName, dataSource)
70+
val df =
71+
spark.read.format(dataSourceName).schema(schema).load().filter("id = 1 and partition = 0")
72+
val plan = df.queryExecution.executedPlan
73+
74+
val filter = collectFirst(plan) {
75+
case s: FilterExecTransformerBase =>
76+
val condition = s.cond.toString
77+
assert(!condition.contains("= 0"))
78+
assert(condition.contains("= 1"))
79+
s
80+
}.getOrElse(
81+
fail(s"FilterExecTransformerBase not found in the plan. Actual plan:\n$plan")
82+
)
83+
84+
// Gluten does not replace PythonScan's BatchScanExec - it stays as vanilla
85+
// BatchScanExec with RowToVeloxColumnar transition
86+
collectFirst(filter) {
87+
case s: BatchScanExec if s.scan.isInstanceOf[PythonScan] =>
88+
val p = s.scan.asInstanceOf[PythonScan]
89+
assert(p.getMetaData().get("PushedFilters").contains("[EqualTo(partition,0)]"))
90+
}.getOrElse(
91+
fail(s"BatchScanExec with PythonScan not found. Actual plan:\n$plan")
92+
)
93+
94+
checkAnswer(df, Seq(Row(1, 0), Row(1, 1)))
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)