Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -43,9 +43,9 @@ case class AvroTable(
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
override def build(): Write =
AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
createFileWriteBuilder(info) { (mergedInfo, partSchema, dynamicOverwrite, truncate) =>
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema,
dynamicOverwrite, truncate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ case class AvroWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends FileWrite {
info: LogicalWriteInfo,
partitionSchema: StructType,
override val dynamicPartitionOverwrite: Boolean,
override val isTruncate: Boolean) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4750,6 +4750,15 @@ object SQLConf {
.stringConf
.createWithDefault("avro,csv,json,kafka,orc,parquet,text")

val V2_FILE_WRITE_ENABLED = buildConf("spark.sql.sources.v2.file.write.enabled")
.internal()
.doc("When true, the file source V2 write path is used for INSERT INTO statements " +
"targeting file-based V2 tables. When false, these writes fall back to the V1 write " +
"path via FallBackFileSourceV2.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

val ALLOW_EMPTY_SCHEMAS_FOR_WRITES = buildConf("spark.sql.legacy.allowEmptySchemaWrite")
.internal()
.doc("When this option is set to true, validation of empty or empty nested schemas that " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
if (curmode == SaveMode.Append) {
AppendData.byName(relation, df.logicalPlan, finalOptions)
} else {
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
// isn't supported
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
val dynamicOverwrite =
df.sparkSession.sessionState.conf.partitionOverwriteMode ==
PartitionOverwriteMode.DYNAMIC &&
partitioningColumns.exists(_.nonEmpty)
if (dynamicOverwrite) {
OverwritePartitionsDynamic.byName(
relation, df.logicalPlan, finalOptions)
} else {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
}
}

case createMode =>
Expand Down Expand Up @@ -595,8 +602,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram

private def lookupV2Provider(): Option[TableProvider] = {
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
// TODO(SPARK-28396): File source v2 write path is currently broken.
case Some(_: FileDataSourceV2) => None
case Some(_: FileDataSourceV2)
if !df.sparkSession.sessionState.conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED) =>
None
// File source V2 supports Append and Overwrite via the DataFrame API V2 write path.
// ErrorIfExists and Ignore require SupportsCatalogOptions (catalog integration),
// so fall back to V1 for these modes.
case Some(_: FileDataSourceV2)
if curmode != SaveMode.Append && curmode != SaveMode.Overwrite =>
None
case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Logical
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
import org.apache.spark.sql.internal.SQLConf

/**
* Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]].
Expand All @@ -33,17 +34,22 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
* removed when Catalog support of file data source v2 is finished.
*/
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
table.fileIndex.partitionSchema,
table.schema,
None,
v1FileFormat,
d.options.asScala.toMap)(sparkSession)
i.copy(table = LogicalRelation(relation))
override def apply(plan: LogicalPlan): LogicalPlan = {
if (sparkSession.sessionState.conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED)) {
return plan
}
plan resolveOperators {
case i @ InsertIntoStatement(
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
table.fileIndex.partitionSchema,
table.schema,
None,
v1FileFormat,
d.options.asScala.toMap)(sparkSession)
i.copy(table = LogicalRelation(relation))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ private[sql] object DataSourceV2Utils extends Logging {
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return None
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
case Some(_: FileDataSourceV2) if !conf.getConf(SQLConf.V2_FILE_WRITE_ENABLED) => None
case Some(p) => Some(p)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsTruncate, Write, WriteBuilder}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
Expand Down Expand Up @@ -174,8 +174,45 @@ abstract class FileTable(
writeInfo.rowIdSchema(),
writeInfo.metadataSchema())
}

/**
* Creates a [[WriteBuilder]] that supports truncate and dynamic partition overwrite
* for file-based tables.
*
* @param info the logical write info
* @param buildWrite factory function that creates the [[Write]] given write info,
* partition schema, dynamic partition overwrite flag,
* and truncate flag
*/
protected def createFileWriteBuilder(
info: LogicalWriteInfo)(
buildWrite: (LogicalWriteInfo, StructType, Boolean, Boolean) => Write): WriteBuilder = {
new WriteBuilder with SupportsDynamicOverwrite with SupportsTruncate {
private var isDynamicOverwrite = false
private var isTruncate = false

override def overwriteDynamicPartitions(): WriteBuilder = {
isDynamicOverwrite = true
this
}

override def truncate(): WriteBuilder = {
isTruncate = true
this
}

override def build(): Write = {
// TODO: SPARK-56175 fileIndex.partitionSchema may be empty for
// new paths. DataFrame API partitionBy() columns are not plumbed
// here yet.
buildWrite(mergedWriteInfo(info), fileIndex.partitionSchema,
isDynamicOverwrite, isTruncate)
}
}
}
}

object FileTable {
private val CAPABILITIES = util.EnumSet.of(BATCH_READ, BATCH_WRITE)
private val CAPABILITIES = util.EnumSet.of(
BATCH_READ, BATCH_WRITE, TRUNCATE, OVERWRITE_DYNAMIC)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
Expand All @@ -30,7 +28,10 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{Expressions, SortDirection}
import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -40,33 +41,85 @@ import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SerializableConfiguration

trait FileWrite extends Write {
trait FileWrite extends Write
with RequiresDistributionAndOrdering {
def paths: Seq[String]
def formatName: String
def supportsDataType: DataType => Boolean
def allowDuplicatedColumnNames: Boolean = false
def info: LogicalWriteInfo
def partitionSchema: StructType
def dynamicPartitionOverwrite: Boolean = false
def isTruncate: Boolean = false

private val schema = info.schema()
private val queryId = info.queryId()
val options = info.options()

override def description(): String = formatName

// RequiresDistributionAndOrdering: sort by partition columns
// to ensure DynamicPartitionDataSingleWriter sees each
// partition value contiguously (preventing file name
// collisions from fileCounter resets).
override def requiredDistribution(): Distribution =
Distributions.unspecified()

override def requiredOrdering(): Array[V2SortOrder] = {
if (partitionSchema.isEmpty) {
Array.empty
} else {
partitionSchema.fieldNames.map { col =>
Expressions.sort(
Expressions.column(col),
SortDirection.ASCENDING)
}
}
}

override def toBatch: BatchWrite = {
val sparkSession = SparkSession.active
validateInputs(sparkSession.sessionState.conf)
val path = new Path(paths.head)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)

// Ensure the output path exists. For new writes (Append to a new path, Overwrite on a new
// path), the path may not exist yet.
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
if (!fs.exists(qualifiedPath)) {
fs.mkdirs(qualifiedPath)
}

// For truncate (full overwrite), delete existing data before writing.
// Note: this is not atomic (same as V1 InsertIntoHadoopFsRelationCommand) -
// if the write fails after deletion, old data is lost.
if (isTruncate && fs.exists(qualifiedPath)) {
fs.listStatus(qualifiedPath).foreach { status =>
// Preserve hidden files/dirs (e.g., _SUCCESS, .spark-staging-*)
if (!status.getPath.getName.startsWith("_") &&
!status.getPath.getName.startsWith(".")) {
fs.delete(status.getPath, true)
}
}
}

val job = getJobInstance(hadoopConf, path)
val jobId = java.util.UUID.randomUUID().toString
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = paths.head)
lazy val description =
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)
jobId = jobId,
outputPath = paths.head,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)
// Evaluate description (which calls prepareWrite) BEFORE
// setupJob, so that format-specific Job configuration
// (e.g., Parquet JOB_SUMMARY_LEVEL) is set before the
// OutputCommitter is created.
val description =
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap,
jobId)

committer.setupJob(job)
new FileBatchWrite(job, description, committer)
Expand Down Expand Up @@ -96,11 +149,20 @@ trait FileWrite extends Write {
SchemaUtils.checkColumnNameDuplication(
schema.fields.map(_.name).toImmutableArraySeq, caseSensitiveAnalysis)
}
if (!sqlConf.allowCollationsInMapKeys) {
SchemaUtils.checkNoCollationsInMapKeys(schema)
}
DataSource.validateSchema(formatName, schema, sqlConf)

// TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
// Only validate data column types, not partition columns.
// Partition columns may use types unsupported by the format
// (e.g., INT in text) since they are written as directory
// names, not as data values.
val resolver = sqlConf.resolver
val partColNames = partitionSchema.fieldNames
schema.foreach { field =>
if (!supportsDataType(field.dataType)) {
if (!partColNames.exists(resolver(_, field.name)) &&
!supportsDataType(field.dataType)) {
throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field)
}
}
Expand All @@ -119,25 +181,38 @@ trait FileWrite extends Write {
hadoopConf: Configuration,
job: Job,
pathName: String,
options: Map[String, String]): WriteJobDescription = {
options: Map[String, String],
jobId: String): WriteJobDescription = {
val caseInsensitiveOptions = CaseInsensitiveMap(options)
val allColumns = toAttributes(schema)
val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val partitionColumns = if (partitionColumnNames.nonEmpty) {
allColumns.filter { col =>
if (caseSensitive) {
partitionColumnNames.contains(col.name)
} else {
partitionColumnNames.exists(_.equalsIgnoreCase(col.name))
}
}
} else {
Seq.empty
}
val dataColumns = allColumns.filterNot(partitionColumns.contains)
// Note: prepareWrite has side effect. It sets "job".
val dataSchema = StructType(dataColumns.map(col => schema(col.name)))
val outputWriterFactory =
prepareWrite(sparkSession.sessionState.conf, job, caseInsensitiveOptions, schema)
val allColumns = toAttributes(schema)
prepareWrite(sparkSession.sessionState.conf, job, caseInsensitiveOptions, dataSchema)
val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val statsTracker = new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
// TODO: after partitioning is supported in V2:
// 1. filter out partition columns in `dataColumns`.
// 2. Don't use Seq.empty for `partitionColumns`.
new WriteJobDescription(
uuid = UUID.randomUUID().toString,
uuid = jobId,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = allColumns,
dataColumns = allColumns,
partitionColumns = Seq.empty,
dataColumns = dataColumns,
partitionColumns = partitionColumns,
bucketSpec = None,
path = pathName,
customPartitionLocations = Map.empty,
Expand Down
Loading