diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 4d777cda87..4123fe1755 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -129,10 +129,13 @@ jobs: - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} + - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'} # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 exclude: - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} + - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name }}/spark-${{ matrix.config.spark-full }} runs-on: ${{ matrix.os }} diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index a41ff3bbd3..92319ff7c0 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644 withSpark(sc) { sc => TestUtils.waitUntilExecutorsUp(sc, 2, 60000) diff --git a/pom.xml b/pom.xml -index 22922143fc3..7c56e5e8641 100644 +index 22922143fc3..97332f7e6ac 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..21d36ebc6f5 100644 +index 2c24cc7d570..638f946353a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -615,7 +615,17 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1151,7 +1157,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("join key with multiple references on the filtering plan") { ++ test("join key with multiple references on the filtering plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support "String + String" +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -625,7 +635,7 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1432,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -635,7 +645,7 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1455,7 +1464,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -645,7 +655,17 @@ index 2c24cc7d570..21d36ebc6f5 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1699,7 +1709,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1730,6 +1741,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -680,18 +700,51 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..2f1bc3880fd 100644 +index 9c529d14221..6cfd87ad864 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec} ++import org.apache.spark.sql.catalyst.util.quietly ++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -967,6 +968,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest + } + + allFileBasedDataSources.foreach { format => +- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { ++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty ++ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + } +- } ++ }} + } + + Seq("json", "orc").foreach { format => +@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -699,7 +752,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1029,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -707,7 +760,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1071,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -715,11 +768,22 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1256,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + +- test("SPARK-41017: filter pushdown with nondeterministic predicates") { ++ test("SPARK-41017: filter pushdown with nondeterministic predicates", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTempPath { path => + val pathStr = path.getCanonicalPath + spark.range(10).write.parquet(pathStr) +@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters + case b: CometScanExec => b.dataFilters ++ case b: CometNativeScanExec => b.dataFilters + case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) @@ -1253,10 +1317,10 @@ index 0df7f806272..92390bd819f 100644 test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -index 2e33f6505ab..47fa031add5 100644 +index 2e33f6505ab..d0f84e8c44d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException +@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union} @@ -1268,8 +1332,11 @@ index 2e33f6505ab..47fa031add5 100644 +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.internal.SQLConf ++import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.test.SharedSparkSession -@@ -1529,6 +1530,12 @@ class SubquerySuite extends QueryTest + + class SubquerySuite extends QueryTest +@@ -1529,6 +1531,12 @@ class SubquerySuite extends QueryTest fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( _.files.forall(_.urlEncodedPath.contains("p=0")))) @@ -1282,7 +1349,17 @@ index 2e33f6505ab..47fa031add5 100644 case _ => false }) } -@@ -2094,7 +2101,7 @@ class SubquerySuite extends QueryTest +@@ -2035,7 +2043,8 @@ class SubquerySuite extends QueryTest + } + } + +- test("Subquery reuse across the whole plan") { ++ test("Subquery reuse across the whole plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY.key -> "false") { + val df = sql( +@@ -2094,7 +2103,7 @@ class SubquerySuite extends QueryTest df.collect() val exchanges = collect(df.queryExecution.executedPlan) { @@ -1291,7 +1368,13 @@ index 2e33f6505ab..47fa031add5 100644 } assert(exchanges.size === 1) } -@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest +@@ -2674,22 +2683,31 @@ class SubquerySuite extends QueryTest + } + } + +- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") { ++ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = { val df = sql(query) checkAnswer(df, answer) @@ -1821,6 +1904,20 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +index 77a988f340e..1acc534064e 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { + } + } + +- test("alter temporary view should follow current storeAnalyzedPlanForView config") { ++ test("alter temporary view should follow current storeAnalyzedPlanForView config", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index aed11badb71..1a365b5aacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -2512,22 +2609,23 @@ index 272be70f9fe..06957694002 100644 assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala -index 0a0b23d1e60..5685926250f 100644 +index 0a0b23d1e60..dcc9c141315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Expand import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ -@@ -868,6 +869,7 @@ abstract class SchemaPruningSuite +@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite val fileSourceScanSchemata = collect(df.queryExecution.executedPlan) { case scan: FileSourceScanExec => scan.requiredSchema + case scan: CometScanExec => scan.requiredSchema ++ case scan: CometNativeScanExec => scan.requiredSchema } assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + @@ -2621,10 +2719,18 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..9aa8f49a62b 100644 +index 6080a5e8e4b..dc64436164f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1102,7 +1102,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1102,7 +1103,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -2637,7 +2743,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1505,7 +1509,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1505,7 +1510,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2647,7 +2753,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1587,7 +1592,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1587,7 +1593,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2660,7 +2766,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1614,7 +1623,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1614,7 +1624,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2673,7 +2779,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1706,7 +1719,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1706,7 +1720,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } @@ -2682,7 +2788,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1750,7 +1763,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1750,7 +1764,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2692,7 +2798,17 @@ index 6080a5e8e4b..9aa8f49a62b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1993,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2702,7 +2818,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2053,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2712,7 +2828,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2305,7 +2321,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2725,7 +2841,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2368,7 +2388,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2739,10 +2855,28 @@ index 6080a5e8e4b..9aa8f49a62b 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 4474ec1fd42..97910c4fc3a 100644 +index 4474ec1fd42..533d4c28dab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2753,10 +2887,38 @@ index 4474ec1fd42..97910c4fc3a 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..38c60ee2584 100644 +index bba71f1c48d..eb0238b8d2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) + + Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => +@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("Enabling/disabling ignoreCorruptFiles") { ++ test("Enabling/disabling ignoreCorruptFiles", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2769,7 +2931,17 @@ index bba71f1c48d..38c60ee2584 100644 } } } -@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2779,7 +2951,7 @@ index bba71f1c48d..38c60ee2584 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2789,6 +2961,16 @@ index bba71f1c48d..38c60ee2584 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) +@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2909,7 +3091,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 0acb21f3e6f..3a7bb73f03c 100644 +index 0acb21f3e6f..0192c6b48f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2917,7 +3099,7 @@ index 0acb21f3e6f..3a7bb73f03c 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2931,10 +3113,39 @@ index 0acb21f3e6f..3a7bb73f03c 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) +@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..236a4e99824 100644 +index 09ed6955a51..6f9174c9545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter + import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + + import org.apache.spark.SparkException +-import org.apache.spark.sql.{DataFrame, QueryTest, Row} ++import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -2955,7 +3166,7 @@ index 09ed6955a51..236a4e99824 100644 } } -@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite +@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite (Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType), (Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType), (Seq("1.23", "10.34"), FloatType, DoubleType), @@ -2963,8 +3174,67 @@ index 09ed6955a51..236a4e99824 100644 + // TODO: Comet cannot handle older than "1582-10-15" + (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, TimestampNTZType) ) ++ wideningTags: Seq[org.scalatest.Tag] = ++ if (fromType == DateType && toType == TimestampNTZType) { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty + } +- test(s"parquet widening conversion $fromType -> $toType") { ++ test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*) { + checkAllParquetReaders(values, fromType, toType, expectError = false) + } + +@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) + ) } - test(s"parquet widening conversion $fromType -> $toType") { +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + +@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, + expectError = + // parquet-mr allows reading decimals into a smaller precision decimal type without +@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) + outputTimestampType <- ParquetOutputTimestampType.values + } +- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { ++ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString +@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + test( +- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { ++ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), +@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) + } + test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + +- s"Decimal($toPrecision, $toScale)" ++ s"Decimal($toPrecision, $toScale)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728") + ) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 458b5dfc0f4..d209f3c85bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -3038,18 +3308,29 @@ index 0dd90925d3c..7d53ec845ef 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3057,7 +3338,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3076,7 +3357,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3084,7 +3365,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan