diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index c8e960a15e..3ed7d69cd3 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -62,6 +62,10 @@ cause Comet to fall back to Spark. - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` +- Schema evolution (type promotion such as int→long, float→double) is always enabled. DataFusion's native Parquet + reader handles type promotion automatically, regardless of the `spark.comet.schemaEvolution.enabled` setting. + This means queries that would fail in Spark or other Comet scan implementations when schema evolution is disabled + will succeed with `native_datafusion`. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 698b68777a..46e7a860f0 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -168,8 +168,8 @@ case class CometScanRule(session: SparkSession) COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO => - // TODO add support for native_datafusion in the future - nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + nativeDataFusionScan(plan, session, scanExec, r, hadoopConf) + .orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf)) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) @@ -222,6 +222,19 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None } + // Case-insensitive mode with duplicate field names produces different errors + // in DataFusion vs Spark, so fall back to avoid incompatible error messages + if (!session.sessionState.conf.caseSensitiveAnalysis) { + val fieldNames = + scanExec.requiredSchema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT)) + if (fieldNames.length != fieldNames.distinct.length) { + withInfo( + scanExec, + "Native DataFusion scan does not support " + + "duplicate field names in case-insensitive mode") + return None + } + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 09a2308e35..03b899b163 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -984,7 +984,7 @@ abstract class ParquetReadSuite extends CometTestBase { withParquetDataFrame(data, schema = Some(readSchema)) { df => // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + .get(conf) != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect())