From 93ae842eda8215275964cc33d171f3de82d927c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 09:42:46 -0600 Subject: [PATCH 1/4] run Spark 3.4 tests with native_datafusion scan --- .github/workflows/spark_sql_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 4d777cda87..3a763d3219 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -126,6 +126,7 @@ jobs: # - native_iceberg_compat: Spark 3.5 only config: - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto'} + - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} From 7fd965560b193e6b2ed7cc577d807a13b3763d23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 11:36:37 -0600 Subject: [PATCH 2/4] fix: update 3.4.3 diff to ignore failing native_datafusion tests Tag tests with IgnoreCometNativeDataFusion and IgnoreCometNativeScan to match tags already applied in the 3.5.8 diff, plus 3 tests that are specific to Spark 3.4. --- dev/diffs/3.4.3.diff | 228 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 187 insertions(+), 41 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8738b3813a..6ef58a750a 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..9c16099090c 100644 +index d3544881af1..377683b10c5 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -417,18 +417,19 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..1925aac8d97 100644 +index f33432ddb6f..881eab7417c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen +@@ -22,6 +22,8 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -262,6 +264,9 @@ abstract class DynamicPartitionPruningSuiteBase case s: BatchScanExec => s.runtimeFilters.collect { case d: DynamicPruningExpression => d.child } @@ -438,7 +439,7 @@ index f33432ddb6f..1925aac8d97 100644 case _ => Nil } } -@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -755,7 +760,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -448,7 +449,7 @@ index f33432ddb6f..1925aac8d97 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1033,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -458,7 +459,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -468,7 +469,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -478,7 +479,17 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1707,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1729,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -513,18 +524,29 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..52438178a0e 100644 +index 2796b1cf154..f5e2fdfe450 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +@@ -33,6 +33,8 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -499,7 +501,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -815,6 +818,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -532,7 +554,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +879,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -540,7 +562,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +921,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -548,7 +570,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1106,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -981,7 +1003,7 @@ index 48ad10992c5..51d1ee65422 100644 extensions.injectColumnar(session => MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index 18123a4d6ec..fbe4c766eee 100644 +index 18123a4d6ec..0fe185baa33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1983,10 +2005,18 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..37ea65081e4 100644 +index 104b4e416cd..8e618094d91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1096,7 +1097,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -1999,7 +2029,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1499,7 +1503,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1499,7 +1504,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2009,7 +2039,7 @@ index 104b4e416cd..37ea65081e4 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1581,7 +1586,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1581,7 +1587,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2022,7 +2052,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1608,7 +1617,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1608,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2035,7 +2065,16 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1700,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + (attr, value) => sources.StringContains(attr, value)) + } + +- test("filter pushdown - StringPredicate") { ++ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) { + import testImplicits._ + // keep() should take effect on StartsWith/EndsWith/Contains + Seq( +@@ -1744,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2045,7 +2084,17 @@ index 104b4e416cd..37ea65081e4 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1934,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1985,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2055,7 +2104,7 @@ index 104b4e416cd..37ea65081e4 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2045,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2065,7 +2114,7 @@ index 104b4e416cd..37ea65081e4 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2277,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2078,7 +2127,7 @@ index 104b4e416cd..37ea65081e4 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2337,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2092,10 +2141,38 @@ index 104b4e416cd..37ea65081e4 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..b624c3811dd 100644 +index 8670d95c65e..45669c0b664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: int as long should throw schema incompatible error") { ++ test("SPARK-35640: int as long should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) + +@@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2106,10 +2183,28 @@ index 8670d95c65e..b624c3811dd 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..44837aa953b 100644 +index 29cb224c878..d16c22b73a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -978,7 +978,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 1000).map { i => + val ts = new java.sql.Timestamp(i) + Row(ts) +@@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2119,7 +2214,17 @@ index 29cb224c878..44837aa953b 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1047,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2129,7 +2234,7 @@ index 29cb224c878..44837aa953b 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1069,7 +1071,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1069,7 +1074,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2139,7 +2244,17 @@ index 29cb224c878..44837aa953b 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1128,7 +1131,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + // The vectorized and non-vectorized readers will produce different exceptions, we don't need +@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } @@ -2244,14 +2359,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..ca22370ca3b 100644 +index bf5c51b89bb..6ff4d1ba18b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.IgnoreComet ++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2265,6 +2380,26 @@ index bf5c51b89bb..ca22370ca3b 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SparkException]) +@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -2314,18 +2449,29 @@ index 26e61c6b58d..cb09d7e116a 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2333,7 +2479,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2352,7 +2498,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2360,7 +2506,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan From f5f321680d95de1edc8cb154ef49427832d9ab5d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 06:15:56 -0600 Subject: [PATCH 3/4] fix: remove redundant IgnoreCometNativeDataFusion imports from 3.4.3 diff Remove imports of IgnoreCometNativeDataFusion in DynamicPartitionPruningSuite and FileBasedDataSourceSuite since both files are in the org.apache.spark.sql package where IgnoreCometNativeDataFusion is defined, making the import redundant and causing "permanently hidden" compilation errors. --- dev/diffs/3.4.3.diff | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 6ef58a750a..b0f4f1e4fb 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -417,19 +417,18 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..881eab7417c 100644 +index f33432ddb6f..81b83a7acd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -@@ -22,6 +22,8 @@ import org.scalatest.GivenWhenThen +@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -@@ -262,6 +264,9 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase case s: BatchScanExec => s.runtimeFilters.collect { case d: DynamicPruningExpression => d.child } @@ -439,7 +438,7 @@ index f33432ddb6f..881eab7417c 100644 case _ => Nil } } -@@ -755,7 +760,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -449,7 +448,7 @@ index f33432ddb6f..881eab7417c 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1027,7 +1033,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -459,7 +458,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -469,7 +468,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -479,7 +478,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1698,7 +1707,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat * Check the static scan metrics with and without DPP */ test("static scan metrics", @@ -489,7 +488,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { -@@ -1729,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -524,19 +523,18 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..f5e2fdfe450 100644 +index 2796b1cf154..db2d16798a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,8 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -499,7 +501,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -546,7 +544,7 @@ index 2796b1cf154..f5e2fdfe450 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -815,6 +818,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -554,7 +552,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +879,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -562,7 +560,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +921,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -570,7 +568,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1106,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters From cd360733abdcc27e0778e04bc92e73bfbce22770 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 11:05:12 -0600 Subject: [PATCH 4/4] fix: check Comet ignore tags before DisableAdaptiveExecution in 3.4.3 diff The test override in SQLTestUtils checked DisableAdaptiveExecution first, so tests with both DisableAdaptiveExecution and IgnoreCometNativeDataFusion tags (like "static scan metrics") would enter the DAE branch and never check the ignore tag. Reorder to match the 3.5.8 diff: check Comet skip tags first with early returns, then handle DisableAdaptiveExecution. --- dev/diffs/3.4.3.diff | 70 +++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index b0f4f1e4fb..e90287aa7f 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2948,7 +2948,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..a1d390c93d0 100644 +index dd55fcfe42c..99bc018008a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ @@ -2967,46 +2967,42 @@ index dd55fcfe42c..a1d390c93d0 100644 import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -118,7 +120,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } +@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -- (implicit pos: Position): Unit = { -+ (implicit pos: Position): Unit = { + (implicit pos: Position): Unit = { ++ // Check Comet skip tags first, before DisableAdaptiveExecution handling ++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ return ++ } ++ if (isCometEnabled) { ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ return ++ } ++ if (isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ return ++ } ++ if ((isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ return ++ } ++ } if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { super.test(testName, testTags: _*) { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { -@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } - } - } else { -- super.test(testName, testTags: _*)(testFun) -+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) -+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || -+ cometScanImpl == CometConf.SCAN_AUTO -+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || -+ cometScanImpl == CometConf.SCAN_AUTO -+ if (isCometEnabled && isNativeIcebergCompat && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { -+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) -+ } else if (isCometEnabled && isNativeDataFusion && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) -+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", -+ testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } - } - } - -@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase +@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase protected override def _sqlContext: SQLContext = self.spark.sqlContext } @@ -3036,7 +3032,7 @@ index dd55fcfe42c..a1d390c93d0 100644 protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { SparkSession.setActiveSession(spark) super.withSQLConf(pairs: _*)(f) -@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase +@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child