Skip to content

[SPARK-56171][SQL] Enable file source V2 write path with partition, dynamic overwrite, and catalog table support#54977

Closed
LuciferYang wants to merge 7 commits intoapache:masterfrom
LuciferYang:SPARK-56171
Closed

[SPARK-56171][SQL] Enable file source V2 write path with partition, dynamic overwrite, and catalog table support#54977
LuciferYang wants to merge 7 commits intoapache:masterfrom
LuciferYang:SPARK-56171

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Mar 24, 2026

What changes were proposed in this pull request?

This PR enables the file source V2 write path (FileWrite) to support partition columns, dynamic partition overwrite, and truncate (full overwrite), gated behind a new feature flag spark.sql.sources.v2.file.write.enabled (default false). It also removes the guard that blocked FileDataSourceV2 from working with catalog tables.

Previously, the V2 file write path was completely disabled — DataFrameWriter returned None for FileDataSourceV2, FallBackFileSourceV2 converted all InsertIntoStatement targeting FileTable back to the V1 path, and DataSourceV2Utils.getTableProvider unconditionally filtered out FileDataSourceV2 for catalog tables. The FileWrite.createWriteJobDescription hardcoded partitionColumns = Seq.empty and dataColumns = allColumns, meaning partitioned writes were impossible even if the V2 path was reached.

Key changes:

DataSourceV2Utils:

  • getTableProvider: replace unconditional FileDataSourceV2 filter with V2_FILE_WRITE_ENABLED flag check, enabling catalog-managed tables to use V2 path when the flag is enabled

FileWrite:

  • Add partitionSchema, dynamicPartitionOverwrite, and isTruncate abstract/default fields
  • Separate allColumns into dataColumns and partitionColumns in createWriteJobDescription using partitionSchema
  • Implement RequiresDistributionAndOrdering — sort by partition columns to ensure DynamicPartitionDataSingleWriter sees each partition value contiguously
  • Add path creation for new directories and truncate logic for overwrite mode in toBatch
  • Pass dynamicPartitionOverwrite to FileCommitProtocol for staging-based dynamic overwrite
  • Fix lazy val description to valprepareWrite must run before setupJob for correct Job configuration (e.g., Parquet JOB_SUMMARY_LEVEL)
  • Add checkNoCollationsInMapKeys validation aligned with V1
  • Skip supportsDataType check for partition columns using sqlConf.resolver for correct case-sensitive/insensitive matching (partition columns are written as directory names, not data values)
  • Use consistent jobId for both FileCommitProtocol and WriteJobDescription.uuid

FileTable:

  • Add createFileWriteBuilder helper that provides WriteBuilder with SupportsDynamicOverwrite and SupportsTruncate mixins
  • Declare BATCH_WRITE, TRUNCATE, and OVERWRITE_DYNAMIC capabilities

FallBackFileSourceV2:

  • Check V2_FILE_WRITE_ENABLED flag — when true, skip the V2-to-V1 conversion

DataFrameWriter:

  • lookupV2Provider: allow FileDataSourceV2 through when flag is enabled, for Append and Overwrite modes (ErrorIfExists/Ignore require SupportsCatalogOptions, deferred to SPARK-56174)

All format Write/Table classes (Parquet, ORC, CSV, JSON, Text, Avro):

  • Add partitionSchema, dynamicPartitionOverwrite, isTruncate parameters
  • Use createFileWriteBuilder in newWriteBuilder

Why are the changes needed?

The file source V2 write path has been disabled since SPARK-28396 with a TODO comment. This blocks V2 from being the default for built-in file formats. The V2 API offers advantages over V1 (custom metrics, aggregate/limit/join pushdown, runtime filtering, row-level operations), but the write path must work before V2 can replace V1.

This is the first patch in a series to close the V1-V2 feature gap. It establishes the write infrastructure behind a feature flag so that subsequent patches can build on it incrementally: cache invalidation, flag flip, and delete FallBackFileSourceV2 (SPARK-56173), ErrorIfExists/Ignore modes (SPARK-56174), partition management (SPARK-56175), statistics (SPARK-56176), bucketing (SPARK-56177), and MSCK REPAIR TABLE (SPARK-56178).

Does this PR introduce any user-facing change?

No. The feature flag spark.sql.sources.v2.file.write.enabled defaults to false. All existing behavior is unchanged. When explicitly enabled, the V2 write path is used for Append and Overwrite modes via the DataFrame API, and catalog tables using file-based formats are loaded as V2 FileTable instances.

How was this patch tested?

Added 13 new tests in FileDataSourceV2FallBackSuite:

  • V2 write path for Append and Overwrite modes — verifies non-partitioned V2 write for all 4 formats (parquet, orc, json, csv), both Append and Overwrite (truncate + write)
  • V2 file write produces same results as V1 write — round-trip comparison between V1 and V2 Append writes
  • Partitioned file write with V2 flag (falls back to V1) — verifies partitioned writes still succeed via V1 fallback
  • Partitioned write produces same results with V2 flag (V1 fallback) — V1/V2 comparison for partitioned writes across all formats
  • Multi-level partitioned write with V2 flag (V1 fallback) — two-level partition directory structure verification
  • V2 dynamic partition overwrite — verifies only affected partitions are overwritten via V2 path
  • V2 dynamic partition overwrite produces same results as V1 — V1/V2 comparison for dynamic overwrite
  • DataFrame API write uses V2 path when flag enabled — Append and Overwrite via DataFrame API exercise V2 code path
  • DataFrame API partitioned write with V2 flag enabled — verifies partitioned DataFrame write succeeds (via V1 fallback)
  • V2 write with compression option — verifies compression options propagate through V2 path
  • Catalog table INSERT INTO uses V2 path — catalog-managed non-partitioned table INSERT via V2
  • Catalog table partitioned INSERT INTO uses V2 path — catalog-managed partitioned table INSERT via V2 (data correctness, not physical partitioning)
  • CTAS uses V2 path — CREATE TABLE AS SELECT via V2

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 4.6.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Mar 24, 2026

Full plan: https://issues.apache.org/jira/browse/SPARK-56170

Known limitations of this patch (addressed in follow-up patches):

  • DataFrame API partitionBy not plumbed to V2: FileDataSourceV2.getTable ignores the partitioning parameter, so FileWrite.partitionSchema is empty for new paths. Partitioned writes via DataFrame API fall back to V1. Fixed in SPARK-56175 via userSpecifiedPartitioning.
  • ErrorIfExists/Ignore not supported via V2: These modes require SupportsCatalogOptions. Falls back to V1. Fixed in SPARK-56174.
  • Catalog table partition metadata not available to FileTable: FileDataSourceV2.getTable ignores the partitioning parameter and FileTable has no access to CatalogTable. Custom partition locations and catalog-aware partitioning require plumbing these through. Fixed in SPARK-56175.

Remaining subtasks:

  1. SPARK-56173 — Cache invalidation, flag flip, and delete FallBackFileSourceV2
  2. SPARK-56174 — ErrorIfExists/Ignore modes and INSERT INTO format.path
  3. SPARK-56175 — FileTable implements SupportsPartitionManagement
  4. SPARK-56176 — V2-native ANALYZE TABLE and ANALYZE COLUMN for file tables
  5. SPARK-56177 — V2 file bucketing write support
  6. SPARK-56178 — MSCK REPAIR TABLE for V2 file tables

@LuciferYang LuciferYang marked this pull request as draft March 24, 2026 08:28
@LuciferYang LuciferYang changed the title [SPARK-56171][SQL] Enable file source V2 write path with partition and dynamic overwrite support [SPARK-56171][SQL] Enable file source V2 write path with partition, dynamic overwrite, and catalog table support Mar 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant