From bf10cebfe54b9b0cb8a9b8e89c617d4219a0e3fb Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 12:55:19 +0800 Subject: [PATCH 1/7] init --- .../apache/spark/sql/v2/avro/AvroTable.scala | 8 +- .../apache/spark/sql/v2/avro/AvroWrite.scala | 5 +- .../apache/spark/sql/internal/SQLConf.scala | 9 + .../spark/sql/classic/DataFrameWriter.scala | 26 +- .../datasources/FallBackFileSourceV2.scala | 30 +- .../execution/datasources/v2/FileTable.scala | 41 ++- .../execution/datasources/v2/FileWrite.scala | 112 ++++++-- .../datasources/v2/csv/CSVTable.scala | 8 +- .../datasources/v2/csv/CSVWrite.scala | 5 +- .../datasources/v2/json/JsonTable.scala | 8 +- .../datasources/v2/json/JsonWrite.scala | 5 +- .../datasources/v2/orc/OrcTable.scala | 8 +- .../datasources/v2/orc/OrcWrite.scala | 5 +- .../datasources/v2/parquet/ParquetTable.scala | 8 +- .../datasources/v2/parquet/ParquetWrite.scala | 5 +- .../datasources/v2/text/TextTable.scala | 8 +- .../datasources/v2/text/TextWrite.scala | 5 +- .../FileDataSourceV2FallBackSuite.scala | 272 ++++++++++++++++++ 18 files changed, 499 insertions(+), 69 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala index e898253be1168..e3ce5762fc14e 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroUtils -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StructType} @@ -43,9 +43,9 @@ case class AvroTable( AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala index 3a91fd0c73d1a..f1242515a3246 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala @@ -29,7 +29,10 @@ case class AvroWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 22f5b3f6c7928..8be944307f3bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4750,6 +4750,15 @@ object SQLConf { .stringConf .createWithDefault("avro,csv,json,kafka,orc,parquet,text") + val V2_FILE_WRITE_ENABLED = buildConf("spark.sql.sources.v2.file.write.enabled") + .internal() + .doc("When true, the file source V2 write path is used for INSERT INTO statements " + + "targeting file-based V2 tables. When false, these writes fall back to the V1 write " + + "path via FallBackFileSourceV2.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val ALLOW_EMPTY_SCHEMAS_FOR_WRITES = buildConf("spark.sql.legacy.allowEmptySchemaWrite") .internal() .doc("When this option is set to true, validation of empty or empty nested schemas that " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala index f0359b33f431d..2284ffdda6c46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala @@ -194,10 +194,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram if (curmode == SaveMode.Append) { AppendData.byName(relation, df.logicalPlan, finalOptions) } else { - // Truncate the table. TableCapabilityCheck will throw a nice exception if this - // isn't supported - OverwriteByExpression.byName( - relation, df.logicalPlan, Literal(true), finalOptions) + val dynamicOverwrite = + df.sparkSession.sessionState.conf.partitionOverwriteMode == + PartitionOverwriteMode.DYNAMIC && + partitioningColumns.exists(_.nonEmpty) + if (dynamicOverwrite) { + OverwritePartitionsDynamic.byName( + relation, df.logicalPlan, finalOptions) + } else { + OverwriteByExpression.byName( + relation, df.logicalPlan, Literal(true), finalOptions) + } } case createMode => @@ -595,8 +602,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram private def lookupV2Provider(): Option[TableProvider] = { DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match { - // TODO(SPARK-28396): File source v2 write path is currently broken. - case Some(_: FileDataSourceV2) => None + case Some(_: FileDataSourceV2) + if !df.sparkSession.sessionState.conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED) => + None + // File source V2 supports Append and Overwrite via the DataFrame API V2 write path. + // ErrorIfExists and Ignore require SupportsCatalogOptions (catalog integration), + // so fall back to V1 for these modes. + case Some(_: FileDataSourceV2) + if curmode != SaveMode.Append && curmode != SaveMode.Overwrite => + None case other => other } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala index e03d6e6772fa1..143edd84294d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Logical import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable} +import org.apache.spark.sql.internal.SQLConf /** * Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]]. @@ -33,17 +34,22 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable} * removed when Catalog support of file data source v2 is finished. */ class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement( - d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) => - val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance() - val relation = HadoopFsRelation( - table.fileIndex, - table.fileIndex.partitionSchema, - table.schema, - None, - v1FileFormat, - d.options.asScala.toMap)(sparkSession) - i.copy(table = LogicalRelation(relation)) + override def apply(plan: LogicalPlan): LogicalPlan = { + if (sparkSession.sessionState.conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED)) { + return plan + } + plan resolveOperators { + case i @ InsertIntoStatement( + d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) => + val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance() + val relation = HadoopFsRelation( + table.fileIndex, + table.fileIndex.partitionSchema, + table.schema, + None, + v1FileFormat, + d.options.asScala.toMap)(sparkSession) + i.copy(table = LogicalRelation(relation)) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 0af728c1958d4..6b0b8806b9b15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsTruncate, Write, WriteBuilder} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex @@ -174,8 +174,45 @@ abstract class FileTable( writeInfo.rowIdSchema(), writeInfo.metadataSchema()) } + + /** + * Creates a [[WriteBuilder]] that supports truncate and dynamic partition overwrite + * for file-based tables. + * + * @param info the logical write info + * @param buildWrite factory function that creates the [[Write]] given write info, + * partition schema, dynamic partition overwrite flag, + * and truncate flag + */ + protected def createFileWriteBuilder( + info: LogicalWriteInfo)( + buildWrite: (LogicalWriteInfo, StructType, Boolean, Boolean) => Write): WriteBuilder = { + new WriteBuilder with SupportsDynamicOverwrite with SupportsTruncate { + private var isDynamicOverwrite = false + private var isTruncate = false + + override def overwriteDynamicPartitions(): WriteBuilder = { + isDynamicOverwrite = true + this + } + + override def truncate(): WriteBuilder = { + isTruncate = true + this + } + + override def build(): Write = { + // Note: fileIndex.partitionSchema may be empty for new paths. + // DataFrame API partitionBy() columns are not plumbed here yet; + // that is handled by userSpecifiedPartitioning in a follow-up patch. + buildWrite(mergedWriteInfo(info), fileIndex.partitionSchema, + isDynamicOverwrite, isTruncate) + } + } + } } object FileTable { - private val CAPABILITIES = util.EnumSet.of(BATCH_READ, BATCH_WRITE) + private val CAPABILITIES = util.EnumSet.of( + BATCH_READ, BATCH_WRITE, TRUNCATE, OVERWRITE_DYNAMIC) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 77e1ade44780f..dfcaa7394b6f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration @@ -30,7 +28,10 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write} +import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} +import org.apache.spark.sql.connector.expressions.{Expressions, SortDirection} +import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, RequiresDistributionAndOrdering, Write} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription} import org.apache.spark.sql.execution.metric.SQLMetric @@ -40,12 +41,16 @@ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.SerializableConfiguration -trait FileWrite extends Write { +trait FileWrite extends Write + with RequiresDistributionAndOrdering { def paths: Seq[String] def formatName: String def supportsDataType: DataType => Boolean def allowDuplicatedColumnNames: Boolean = false def info: LogicalWriteInfo + def partitionSchema: StructType + def dynamicPartitionOverwrite: Boolean = false + def isTruncate: Boolean = false private val schema = info.schema() private val queryId = info.queryId() @@ -53,6 +58,25 @@ trait FileWrite extends Write { override def description(): String = formatName + // RequiresDistributionAndOrdering: sort by partition columns + // to ensure DynamicPartitionDataSingleWriter sees each + // partition value contiguously (preventing file name + // collisions from fileCounter resets). + override def requiredDistribution(): Distribution = + Distributions.unspecified() + + override def requiredOrdering(): Array[V2SortOrder] = { + if (partitionSchema.isEmpty) { + Array.empty + } else { + partitionSchema.fieldNames.map { col => + Expressions.sort( + Expressions.column(col), + SortDirection.ASCENDING) + } + } + } + override def toBatch: BatchWrite = { val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf) @@ -60,13 +84,42 @@ trait FileWrite extends Write { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + + // Ensure the output path exists. For new writes (Append to a new path, Overwrite on a new + // path), the path may not exist yet. + val fs = path.getFileSystem(hadoopConf) + val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + if (!fs.exists(qualifiedPath)) { + fs.mkdirs(qualifiedPath) + } + + // For truncate (full overwrite), delete existing data before writing. + // TODO: This is not atomic — if the write fails after deletion, old data is lost. + // Consider moving into FileBatchWrite.commit() for atomic overwrite semantics. + if (isTruncate && fs.exists(qualifiedPath)) { + fs.listStatus(qualifiedPath).foreach { status => + // Preserve hidden files/dirs (e.g., _SUCCESS, .spark-staging-*) + if (!status.getPath.getName.startsWith("_") && + !status.getPath.getName.startsWith(".")) { + fs.delete(status.getPath, true) + } + } + } + val job = getJobInstance(hadoopConf, path) + val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = paths.head) - lazy val description = - createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap) + jobId = jobId, + outputPath = paths.head, + dynamicPartitionOverwrite = dynamicPartitionOverwrite) + // Evaluate description (which calls prepareWrite) BEFORE + // setupJob, so that format-specific Job configuration + // (e.g., Parquet JOB_SUMMARY_LEVEL) is set before the + // OutputCommitter is created. + val description = + createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap, + jobId) committer.setupJob(job) new FileBatchWrite(job, description, committer) @@ -96,11 +149,19 @@ trait FileWrite extends Write { SchemaUtils.checkColumnNameDuplication( schema.fields.map(_.name).toImmutableArraySeq, caseSensitiveAnalysis) } + if (!sqlConf.allowCollationsInMapKeys) { + SchemaUtils.checkNoCollationsInMapKeys(schema) + } DataSource.validateSchema(formatName, schema, sqlConf) - // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert. + // Only validate data column types, not partition columns. + // Partition columns may use types unsupported by the format + // (e.g., INT in text) since they are written as directory + // names, not as data values. + val partColNames = partitionSchema.fieldNames.toSet schema.foreach { field => - if (!supportsDataType(field.dataType)) { + if (!partColNames.contains(field.name) && + !supportsDataType(field.dataType)) { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field) } } @@ -119,25 +180,38 @@ trait FileWrite extends Write { hadoopConf: Configuration, job: Job, pathName: String, - options: Map[String, String]): WriteJobDescription = { + options: Map[String, String], + jobId: String): WriteJobDescription = { val caseInsensitiveOptions = CaseInsensitiveMap(options) + val allColumns = toAttributes(schema) + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val partitionColumns = if (partitionColumnNames.nonEmpty) { + allColumns.filter { col => + if (caseSensitive) { + partitionColumnNames.contains(col.name) + } else { + partitionColumnNames.exists(_.equalsIgnoreCase(col.name)) + } + } + } else { + Seq.empty + } + val dataColumns = allColumns.filterNot(partitionColumns.contains) // Note: prepareWrite has side effect. It sets "job". + val dataSchema = StructType(dataColumns.map(col => schema(col.name))) val outputWriterFactory = - prepareWrite(sparkSession.sessionState.conf, job, caseInsensitiveOptions, schema) - val allColumns = toAttributes(schema) + prepareWrite(sparkSession.sessionState.conf, job, caseInsensitiveOptions, dataSchema) val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics val serializableHadoopConf = new SerializableConfiguration(hadoopConf) val statsTracker = new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) - // TODO: after partitioning is supported in V2: - // 1. filter out partition columns in `dataColumns`. - // 2. Don't use Seq.empty for `partitionColumns`. new WriteJobDescription( - uuid = UUID.randomUUID().toString, + uuid = jobId, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, allColumns = allColumns, - dataColumns = allColumns, - partitionColumns = Seq.empty, + dataColumns = dataColumns, + partitionColumns = partitionColumns, bucketSpec = None, path = pathName, customPartitionLocations = Map.empty, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index 4938df795cb1a..76d144dc0b810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -50,9 +50,9 @@ case class CSVTable( } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - CSVWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + CSVWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala index 7011fea77d888..b54e364334d72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala @@ -31,7 +31,10 @@ case class CSVWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite { override def allowDuplicatedColumnNames: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index cf3c1e11803c0..ca15779a803b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.json.JSONOptionsInRead -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -50,9 +50,9 @@ case class JsonTable( } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - JsonWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + JsonWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala index ea1f6793cb9ca..e09f2a2a2dc88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala @@ -31,7 +31,10 @@ case class JsonWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index 08cd89fdacc61..f5688b109e449 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -44,9 +44,9 @@ case class OrcTable( OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - OrcWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + OrcWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala index 12dff269a468e..4fc80398326a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala @@ -32,7 +32,10 @@ case class OrcWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index 67052c201a9df..164974d1f3e4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -44,9 +44,9 @@ case class ParquetTable( ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - ParquetWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + ParquetWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala index e37b1fce7c37e..1c0cc3e6972bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala @@ -30,7 +30,10 @@ case class ParquetWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite with Logging { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite with Logging { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala index d8880b84c6211..6d113b4c407bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.text import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} @@ -40,9 +40,9 @@ case class TextTable( Some(StructType(Array(StructField("value", StringType)))) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - new WriteBuilder { - override def build(): Write = - TextWrite(paths, formatName, supportsDataType, mergedWriteInfo(info)) + createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) => + TextWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, + dynamicOverwrite, truncate) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala index 7bee49f05cbcd..21869ca9e05c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala @@ -31,7 +31,10 @@ case class TextWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends FileWrite { + info: LogicalWriteInfo, + partitionSchema: StructType, + override val dynamicPartitionOverwrite: Boolean, + override val isTruncate: Boolean) extends FileWrite { private def verifySchema(schema: StructType): Unit = { if (schema.size != 1) { throw QueryCompilationErrors.textDataSourceWithMultiColumnsError(schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index 2a0ab21ddb09c..52800005b950e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -196,4 +196,276 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { } } } + + test("V2 write path for Append and Overwrite modes") { + Seq("parquet", "orc", "json", "csv").foreach { format => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + // Append mode goes through V2 + withTempPath { path => + val p = path.getCanonicalPath + new java.io.File(p).mkdirs() + val inputData = spark.range(10).toDF() + inputData.write.option("header", "true").mode("append") + .format(format).save(p) + val readBack = spark.read.option("header", "true").schema(inputData.schema) + .format(format).load(p) + checkAnswer(readBack, inputData) + } + // Overwrite mode goes through V2 (truncate + write) + withTempPath { path => + val p = path.getCanonicalPath + new java.io.File(p).mkdirs() + val data1 = spark.range(10).toDF() + data1.write.option("header", "true").mode("append") + .format(format).save(p) + val data2 = spark.range(20, 30).toDF() + data2.write.option("header", "true").mode("overwrite") + .format(format).save(p) + val readBack = spark.read.option("header", "true").schema(data2.schema) + .format(format).load(p) + checkAnswer(readBack, data2) + } + } + } + } + + test("V2 file write produces same results as V1 write") { + withTempPath { v1Path => + withTempPath { v2Path => + val inputData = spark.range(100).selectExpr("id", "id * 2 as value") + + // Write via V1 path (Append to pre-created dir) + new java.io.File(v1Path.getCanonicalPath).mkdirs() + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + inputData.write.mode("append").parquet(v1Path.getCanonicalPath) + } + + // Write via V2 path (Append to pre-created dir) + new java.io.File(v2Path.getCanonicalPath).mkdirs() + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + inputData.write.mode("append").parquet(v2Path.getCanonicalPath) + } + + // Both should produce the same results + val v1Result = spark.read.parquet(v1Path.getCanonicalPath) + val v2Result = spark.read.parquet(v2Path.getCanonicalPath) + checkAnswer(v1Result, v2Result) + } + } + } + + test("Partitioned file write with V2 flag (falls back to V1)") { + // Partitioned writes via DataFrame API use ErrorIfExists by default, + // which falls back to V1 since FileDataSourceV2 doesn't implement + // SupportsCatalogOptions. Also, FileDataSourceV2.getTable ignores + // partitioning transforms, so even with Append mode the V2 path + // doesn't know about partition columns. Full V2 partitioned write + // via DataFrame API requires userSpecifiedPartitioning plumbing + // (handled in a follow-up patch). + Seq("parquet", "orc", "json", "csv").foreach { format => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTempPath { path => + val inputData = spark.range(20).selectExpr( + "id", "id % 5 as part") + inputData.write.option("header", "true") + .partitionBy("part").format(format).save(path.getCanonicalPath) + val readBack = spark.read.option("header", "true").schema(inputData.schema) + .format(format).load(path.getCanonicalPath) + checkAnswer(readBack, inputData) + + // Verify partition directory structure exists (via V1 fallback) + val partDirs = path.listFiles().filter(_.isDirectory).map(_.getName).sorted + assert(partDirs.exists(_.startsWith("part=")), + s"Expected partition directories for format $format, got: ${partDirs.mkString(", ")}") + } + } + } + } + + test("Partitioned write produces same results with V2 flag (V1 fallback)") { + Seq("parquet", "orc", "json", "csv").foreach { format => + withTempPath { v1Path => + withTempPath { v2Path => + val inputData = spark.range(50).selectExpr( + "id", "id % 3 as category", "id * 10 as value") + + // Write via V1 path + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + inputData.write.option("header", "true") + .partitionBy("category").format(format).save(v1Path.getCanonicalPath) + } + + // Write via V2 path + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + inputData.write.option("header", "true") + .partitionBy("category").format(format).save(v2Path.getCanonicalPath) + } + + val v1Result = spark.read.option("header", "true").schema(inputData.schema) + .format(format).load(v1Path.getCanonicalPath) + val v2Result = spark.read.option("header", "true").schema(inputData.schema) + .format(format).load(v2Path.getCanonicalPath) + checkAnswer(v1Result, v2Result) + } + } + } + } + + test("Multi-level partitioned write with V2 flag (V1 fallback)") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTempPath { path => + val inputData = spark.range(30).selectExpr( + "id", "id % 3 as year", "id % 2 as month") + inputData.write.partitionBy("year", "month").parquet(path.getCanonicalPath) + val readBack = spark.read.parquet(path.getCanonicalPath) + checkAnswer(readBack, inputData) + + // Verify two-level partition directory structure + val yearDirs = path.listFiles().filter(_.isDirectory).map(_.getName).sorted + assert(yearDirs.exists(_.startsWith("year=")), + s"Expected year partition directories, got: ${yearDirs.mkString(", ")}") + val firstYearDir = path.listFiles().filter(_.isDirectory).head + val monthDirs = firstYearDir.listFiles().filter(_.isDirectory).map(_.getName).sorted + assert(monthDirs.exists(_.startsWith("month=")), + s"Expected month partition directories, got: ${monthDirs.mkString(", ")}") + } + } + } + + test("V2 dynamic partition overwrite") { + Seq("parquet", "orc").foreach { format => + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true", + SQLConf.PARTITION_OVERWRITE_MODE.key -> "dynamic") { + withTempPath { path => + // Write initial data: part=0,1,2 + val initialData = spark.range(9).selectExpr("id", "id % 3 as part") + initialData.write.partitionBy("part") + .format(format).save(path.getCanonicalPath) + + // Overwrite only part=0 with new data + val overwriteData = spark.createDataFrame(Seq((100L, 0L), (101L, 0L))) + .toDF("id", "part") + overwriteData.write.mode("overwrite").partitionBy("part") + .format(format).save(path.getCanonicalPath) + + // part=1 and part=2 should be untouched, part=0 should have new data + val result = spark.read.format(format).load(path.getCanonicalPath) + val expected = initialData.filter("part != 0").union(overwriteData) + checkAnswer(result, expected) + } + } + } + } + + test("V2 dynamic partition overwrite produces same results as V1") { + Seq("parquet", "orc").foreach { format => + withTempPath { v1Path => + withTempPath { v2Path => + val initialData = spark.range(12).selectExpr("id", "id % 4 as part") + val overwriteData = spark.createDataFrame(Seq((200L, 1L), (201L, 1L))) + .toDF("id", "part") + + // V1 path + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> format, + SQLConf.PARTITION_OVERWRITE_MODE.key -> "dynamic") { + initialData.write.partitionBy("part").format(format).save(v1Path.getCanonicalPath) + overwriteData.write.mode("overwrite").partitionBy("part") + .format(format).save(v1Path.getCanonicalPath) + } + + // V2 path + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true", + SQLConf.PARTITION_OVERWRITE_MODE.key -> "dynamic") { + initialData.write.partitionBy("part").format(format).save(v2Path.getCanonicalPath) + overwriteData.write.mode("overwrite").partitionBy("part") + .format(format).save(v2Path.getCanonicalPath) + } + + val v1Result = spark.read.format(format).load(v1Path.getCanonicalPath) + val v2Result = spark.read.format(format).load(v2Path.getCanonicalPath) + checkAnswer(v1Result, v2Result) + } + } + } + } + + test("DataFrame API write uses V2 path when flag enabled") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + Seq("parquet", "orc", "json").foreach { format => + // SaveMode.Append to existing path goes via V2 + withTempPath { path => + // First write via V1 (ErrorIfExists falls back to V1) + val data1 = spark.range(5).toDF() + data1.write.format(format).save(path.getCanonicalPath) + // Append via V2 + val data2 = spark.range(5, 10).toDF() + data2.write.mode("append").format(format).save(path.getCanonicalPath) + checkAnswer( + spark.read.format(format).load(path.getCanonicalPath), + data1.union(data2)) + } + + // SaveMode.Overwrite goes via V2 (truncate + write) + withTempPath { path => + val data1 = spark.range(5).toDF() + data1.write.format(format).save(path.getCanonicalPath) + val data2 = spark.range(10, 15).toDF() + data2.write.mode("overwrite").format(format).save(path.getCanonicalPath) + checkAnswer(spark.read.format(format).load(path.getCanonicalPath), data2) + } + } + } + } + + test("DataFrame API partitioned write with V2 flag enabled") { + // Note: df.write.partitionBy("part").parquet(path) uses ErrorIfExists mode by default, + // which falls back to V1 since FileDataSourceV2 doesn't implement SupportsCatalogOptions. + // Partitioned writes via the V2 DataFrame API path require catalog integration (Sub-4). + // This test verifies the write still succeeds (via V1 fallback). + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTempPath { path => + val data = spark.range(20).selectExpr("id", "id % 4 as part") + data.write.partitionBy("part").parquet(path.getCanonicalPath) + val result = spark.read.parquet(path.getCanonicalPath) + checkAnswer(result, data) + + val partDirs = path.listFiles().filter(_.isDirectory).map(_.getName) + assert(partDirs.exists(_.startsWith("part="))) + } + } + } + + test("V2 write with compression option") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTempPath { path => + val p = path.getCanonicalPath + new java.io.File(p).mkdirs() + val data = spark.range(10).toDF() + // Append mode ensures V2 path is used + data.write.option("compression", "snappy").mode("append").parquet(p) + checkAnswer(spark.read.parquet(p), data) + } + } + } } From 998f91bca506cf0ca4e1e97b47d7f4edcfc0fac6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 15:31:11 +0800 Subject: [PATCH 2/7] fix --- .../spark/sql/execution/datasources/v2/FileWrite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index dfcaa7394b6f9..63981b38a19a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import java.util.Locale + import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration @@ -158,9 +160,13 @@ trait FileWrite extends Write // Partition columns may use types unsupported by the format // (e.g., INT in text) since they are written as directory // names, not as data values. - val partColNames = partitionSchema.fieldNames.toSet + val partColNames = partitionSchema.fieldNames.map { name => + if (caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + }.toSet schema.foreach { field => - if (!partColNames.contains(field.name) && + val fieldName = + if (caseSensitiveAnalysis) field.name else field.name.toLowerCase(Locale.ROOT) + if (!partColNames.contains(fieldName) && !supportsDataType(field.dataType)) { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field) } From 9bc8502cdb4cf733723c95ab7b40ecbb1c4a7e85 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 15:40:48 +0800 Subject: [PATCH 3/7] fix style --- .../sql/execution/datasources/v2/FileWrite.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 63981b38a19a1..7cfe50872747e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import java.util.Locale - import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration @@ -96,7 +94,7 @@ trait FileWrite extends Write } // For truncate (full overwrite), delete existing data before writing. - // TODO: This is not atomic — if the write fails after deletion, old data is lost. + // TODO: This is not atomic - if the write fails after deletion, old data is lost. // Consider moving into FileBatchWrite.commit() for atomic overwrite semantics. if (isTruncate && fs.exists(qualifiedPath)) { fs.listStatus(qualifiedPath).foreach { status => @@ -160,13 +158,10 @@ trait FileWrite extends Write // Partition columns may use types unsupported by the format // (e.g., INT in text) since they are written as directory // names, not as data values. - val partColNames = partitionSchema.fieldNames.map { name => - if (caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) - }.toSet + val resolver = sqlConf.resolver + val partColNames = partitionSchema.fieldNames schema.foreach { field => - val fieldName = - if (caseSensitiveAnalysis) field.name else field.name.toLowerCase(Locale.ROOT) - if (!partColNames.contains(fieldName) && + if (!partColNames.exists(resolver(_, field.name)) && !supportsDataType(field.dataType)) { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field) } From b0f9138e75c249441ee31d13a00917dd71bee48e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 16:27:04 +0800 Subject: [PATCH 4/7] use flag --- .../datasources/v2/DataSourceV2Utils.scala | 4 +- .../FileDataSourceV2FallBackSuite.scala | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index a3b5c5aeb7995..eb2ab82388459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -164,8 +164,8 @@ private[sql] object DataSourceV2Utils extends Logging { // `HiveFileFormat`, when running tests in sql/core. if (DDLUtils.isHiveTable(Some(provider))) return None DataSource.lookupDataSourceV2(provider, conf) match { - // TODO(SPARK-28396): Currently file source v2 can't work with tables. - case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p) + case Some(_: FileDataSourceV2) if !conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED) => None + case Some(p) => Some(p) case _ => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index 52800005b950e..8078288d022ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -468,4 +468,54 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { } } } + + test("Catalog table INSERT INTO uses V2 path") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTable("t") { + sql("CREATE TABLE t (id BIGINT, value BIGINT) USING parquet") + sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)") + checkAnswer(sql("SELECT * FROM t"), + Seq((1L, 10L), (2L, 20L), (3L, 30L)).map(Row.fromTuple)) + } + } + } + + test("Catalog table partitioned INSERT INTO uses V2 path") { + // Note: FileDataSourceV2.getTable ignores partitioning transforms, + // so data is written flat (not in partition directories) via V2. + // Physical partitioning requires userSpecifiedPartitioning (Sub-4). + // This test verifies data correctness, not physical layout. + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTable("t") { + sql("CREATE TABLE t (id BIGINT, part BIGINT) " + + "USING parquet PARTITIONED BY (part)") + sql("INSERT INTO t VALUES (1, 1), (2, 1), (3, 2), (4, 2)") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), + Seq((1L, 1L), (2L, 1L), (3L, 2L), (4L, 2L)) + .map(Row.fromTuple)) + } + } + } + + test("CTAS uses V2 path") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "", + SQLConf.V2_FILE_WRITE_ENABLED.key -> "true") { + withTable("t") { + sql("CREATE TABLE t USING parquet " + + "AS SELECT id, id * 2 as value FROM range(10)") + checkAnswer( + sql("SELECT count(*) FROM t"), + Seq(Row(10L))) + } + } + } + + // TODO: "INSERT INTO writes to custom partition location" test + // deferred to Sub-3/Sub-5 when catalogTable is set on FileTable + // and getCustomPartitionLocations returns real values. } From 41d205271f60c612d513fd5c9a9db781c50c852f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 16:51:47 +0800 Subject: [PATCH 5/7] cleanup comments --- .../execution/datasources/v2/FileTable.scala | 6 +++--- .../FileDataSourceV2FallBackSuite.scala | 21 ++++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 6b0b8806b9b15..1b0878c57a7bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -202,9 +202,9 @@ abstract class FileTable( } override def build(): Write = { - // Note: fileIndex.partitionSchema may be empty for new paths. - // DataFrame API partitionBy() columns are not plumbed here yet; - // that is handled by userSpecifiedPartitioning in a follow-up patch. + // TODO: SPARK-56175 fileIndex.partitionSchema may be empty for + // new paths. DataFrame API partitionBy() columns are not plumbed + // here yet. buildWrite(mergedWriteInfo(info), fileIndex.partitionSchema, isDynamicOverwrite, isTruncate) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index 8078288d022ea..31946a6a1b210 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -263,9 +263,9 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { // which falls back to V1 since FileDataSourceV2 doesn't implement // SupportsCatalogOptions. Also, FileDataSourceV2.getTable ignores // partitioning transforms, so even with Append mode the V2 path - // doesn't know about partition columns. Full V2 partitioned write - // via DataFrame API requires userSpecifiedPartitioning plumbing - // (handled in a follow-up patch). + // doesn't know about partition columns. + // TODO: SPARK-56175 Full V2 partitioned write via DataFrame API + // requires userSpecifiedPartitioning plumbing. Seq("parquet", "orc", "json", "csv").foreach { format => withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", @@ -435,9 +435,9 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { } test("DataFrame API partitioned write with V2 flag enabled") { - // Note: df.write.partitionBy("part").parquet(path) uses ErrorIfExists mode by default, - // which falls back to V1 since FileDataSourceV2 doesn't implement SupportsCatalogOptions. - // Partitioned writes via the V2 DataFrame API path require catalog integration (Sub-4). + // TODO: SPARK-56174 df.write.partitionBy("part").parquet(path) uses ErrorIfExists mode + // by default, which falls back to V1 since FileDataSourceV2 doesn't implement + // SupportsCatalogOptions. // This test verifies the write still succeeds (via V1 fallback). withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", @@ -483,9 +483,9 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { } test("Catalog table partitioned INSERT INTO uses V2 path") { - // Note: FileDataSourceV2.getTable ignores partitioning transforms, - // so data is written flat (not in partition directories) via V2. - // Physical partitioning requires userSpecifiedPartitioning (Sub-4). + // TODO: SPARK-56175 FileDataSourceV2.getTable ignores partitioning + // transforms, so data is written flat (not in partition directories) + // via V2. Physical partitioning requires userSpecifiedPartitioning. // This test verifies data correctness, not physical layout. withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", @@ -515,7 +515,4 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { } } - // TODO: "INSERT INTO writes to custom partition location" test - // deferred to Sub-3/Sub-5 when catalogTable is set on FileTable - // and getCustomPartitionLocations returns real values. } From 070c9609d5e0f831e6ced17a89ac2d2d7586fd4c Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 17:08:46 +0800 Subject: [PATCH 6/7] more TODO --- .../apache/spark/sql/execution/datasources/v2/FileWrite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 7cfe50872747e..567b7c44b9d33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -94,7 +94,7 @@ trait FileWrite extends Write } // For truncate (full overwrite), delete existing data before writing. - // TODO: This is not atomic - if the write fails after deletion, old data is lost. + // TODO: SPARK-56173 This is not atomic - if the write fails after deletion, old data is lost. // Consider moving into FileBatchWrite.commit() for atomic overwrite semantics. if (isTruncate && fs.exists(qualifiedPath)) { fs.listStatus(qualifiedPath).foreach { status => From 2718b8cd24468cd85ffaf6dd0995aa819a657dbf Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Mar 2026 17:33:03 +0800 Subject: [PATCH 7/7] fix comments --- .../apache/spark/sql/execution/datasources/v2/FileWrite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 567b7c44b9d33..040be97d481ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -94,8 +94,8 @@ trait FileWrite extends Write } // For truncate (full overwrite), delete existing data before writing. - // TODO: SPARK-56173 This is not atomic - if the write fails after deletion, old data is lost. - // Consider moving into FileBatchWrite.commit() for atomic overwrite semantics. + // Note: this is not atomic (same as V1 InsertIntoHadoopFsRelationCommand) - + // if the write fails after deletion, old data is lost. if (isTruncate && fs.exists(qualifiedPath)) { fs.listStatus(qualifiedPath).foreach { status => // Preserve hidden files/dirs (e.g., _SUCCESS, .spark-staging-*)