From 5ef5da251d9d5382d6e6a3faa4ef67e6a50c5598 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Mar 2026 17:22:57 -0600 Subject: [PATCH 1/4] experiment: what tests still fail with native_datafusion scan --- .../main/scala/org/apache/comet/rules/CometScanRule.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 698b68777a..06213d1408 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -167,11 +167,7 @@ case class CometScanRule(session: SparkSession) } COMET_NATIVE_SCAN_IMPL.get() match { - case SCAN_AUTO => - // TODO add support for native_datafusion in the future - nativeIcebergCompatScan(session, scanExec, r, hadoopConf) - .getOrElse(scanExec) - case SCAN_NATIVE_DATAFUSION => + case SCAN_AUTO | SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) From e9ea8e3c2f4961f69d4167eba4de65ddd6ba6a19 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Mar 2026 17:49:40 -0600 Subject: [PATCH 2/4] feat: prefer native_datafusion scan in auto mode with fallback In auto scan mode, try native_datafusion scan first and fall back to native_iceberg_compat when native_datafusion is not supported (e.g., when COMET_EXEC_ENABLED is disabled, or DPP is used). --- .../main/scala/org/apache/comet/rules/CometScanRule.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 06213d1408..810d67d594 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -167,7 +167,11 @@ case class CometScanRule(session: SparkSession) } COMET_NATIVE_SCAN_IMPL.get() match { - case SCAN_AUTO | SCAN_NATIVE_DATAFUSION => + case SCAN_AUTO => + nativeDataFusionScan(plan, session, scanExec, r, hadoopConf) + .orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf)) + .getOrElse(scanExec) + case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) From 627bb853f701f0f69f0e50ab0375c501cd192dcf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Mar 2026 17:50:41 -0600 Subject: [PATCH 3/4] docs: document schema evolution behavior for native_datafusion scan --- docs/source/contributor-guide/parquet_scans.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index c8e960a15e..3ed7d69cd3 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -62,6 +62,10 @@ cause Comet to fall back to Spark. - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` +- Schema evolution (type promotion such as int→long, float→double) is always enabled. DataFusion's native Parquet + reader handles type promotion automatically, regardless of the `spark.comet.schemaEvolution.enabled` setting. + This means queries that would fail in Spark or other Comet scan implementations when schema evolution is disabled + will succeed with `native_datafusion`. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: From 738694a2d507919f0f1d88bd3e02218d1ef9015d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Mar 2026 22:15:48 -0600 Subject: [PATCH 4/4] fix: detect and fall back for native_datafusion incompatibilities - Add detection for case-insensitive duplicate field names in CometScanRule, falling back to native_iceberg_compat when native_datafusion would produce different error messages than Spark. - Fix schema evolution test to account for auto mode now preferring native_datafusion, which always handles type promotion. The metrics test failures (output_rows=0, filter pushdown=0) are pre-existing on main and not caused by this change. --- .../org/apache/comet/rules/CometScanRule.scala | 13 +++++++++++++ .../org/apache/comet/parquet/ParquetReadSuite.scala | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 810d67d594..46e7a860f0 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -222,6 +222,19 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None } + // Case-insensitive mode with duplicate field names produces different errors + // in DataFusion vs Spark, so fall back to avoid incompatible error messages + if (!session.sessionState.conf.caseSensitiveAnalysis) { + val fieldNames = + scanExec.requiredSchema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT)) + if (fieldNames.length != fieldNames.distinct.length) { + withInfo( + scanExec, + "Native DataFusion scan does not support " + + "duplicate field names in case-insensitive mode") + return None + } + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 09a2308e35..03b899b163 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -984,7 +984,7 @@ abstract class ParquetReadSuite extends CometTestBase { withParquetDataFrame(data, schema = Some(readSchema)) { df => // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + .get(conf) != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect())