Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ cause Comet to fall back to Spark.
- No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
- Schema evolution (type promotion such as int→long, float→double) is always enabled. DataFusion's native Parquet
reader handles type promotion automatically, regardless of the `spark.comet.schemaEvolution.enabled` setting.
This means queries that would fail in Spark or other Comet scan implementations when schema evolution is disabled
will succeed with `native_datafusion`.

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
without falling back to Spark:
Expand Down
17 changes: 15 additions & 2 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ case class CometScanRule(session: SparkSession)

COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
// TODO add support for native_datafusion in the future
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf)
.orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf))
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)
Expand Down Expand Up @@ -222,6 +222,19 @@ case class CometScanRule(session: SparkSession)
withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching")
return None
}
// Case-insensitive mode with duplicate field names produces different errors
// in DataFusion vs Spark, so fall back to avoid incompatible error messages
if (!session.sessionState.conf.caseSensitiveAnalysis) {
val fieldNames =
scanExec.requiredSchema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT))
if (fieldNames.length != fieldNames.distinct.length) {
withInfo(
scanExec,
"Native DataFusion scan does not support " +
"duplicate field names in case-insensitive mode")
return None
}
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ abstract class ParquetReadSuite extends CometTestBase {
withParquetDataFrame(data, schema = Some(readSchema)) { df =>
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL
.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) {
.get(conf) != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
checkAnswer(df, data.map(Row.fromTuple))
} else {
assertThrows[SparkException](df.collect())
Expand Down
Loading