Skip to content

Commit 2f17032

Browse files
authored
Spark: Backport various fixes for SparkFileWriterFactory (#15357)
Backports #15356
1 parent ad72516 commit 2f17032

6 files changed

Lines changed: 15 additions & 45 deletions

File tree

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,6 @@ public static <T> ParquetValueWriter<T> buildWriter(
7777
new WriteBuilder(type));
7878
}
7979

80-
public static <T> ParquetValueWriter<T> buildWriter(
81-
StructType dfSchema, MessageType type, Schema icebergSchema) {
82-
return (ParquetValueWriter<T>)
83-
ParquetWithSparkSchemaVisitor.visit(
84-
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
85-
type,
86-
new WriteBuilder(type));
87-
}
88-
8980
private static class WriteBuilder extends ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
9081
private final MessageType type;
9182

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
6060
private StructType positionDeleteSparkType;
6161
private final Schema positionDeleteRowSchema;
6262
private final Table table;
63-
private final FileFormat format;
63+
private final FileFormat deleteFormat;
6464
private final Map<String, String> writeProperties;
6565

6666
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
100100
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
101101

102102
this.table = table;
103-
this.format = dataFileFormat;
103+
this.deleteFormat = deleteFileFormat;
104104
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
105105
this.positionDeleteRowSchema = positionDeleteRowSchema;
106106
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
138138
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
139139

140140
this.table = table;
141-
this.format = dataFileFormat;
141+
this.deleteFormat = deleteFileFormat;
142142
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
143143
this.positionDeleteRowSchema = null;
144144
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
172172
: MetricsConfig.forPositionDelete(table);
173173

174174
try {
175-
return switch (format) {
175+
return switch (deleteFormat) {
176176
case AVRO ->
177177
Avro.writeDeletes(file)
178178
.createWriterFunc(
@@ -215,14 +215,13 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
215215
.metricsConfig(metricsConfig)
216216
.withPartition(partition)
217217
.overwrite()
218-
.metricsConfig(metricsConfig)
219218
.rowSchema(positionDeleteRowSchema)
220219
.withSpec(spec)
221220
.withKeyMetadata(file.keyMetadata())
222221
.buildPositionWriter();
223222
default ->
224223
throw new UnsupportedOperationException(
225-
"Cannot write pos-deletes for unsupported file format: " + format);
224+
"Cannot write pos-deletes for unsupported file format: " + deleteFormat);
226225
};
227226
} catch (IOException e) {
228227
throw new UncheckedIOException("Failed to create new position delete writer", e);

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,6 @@ public static <T> ParquetValueWriter<T> buildWriter(
7777
new WriteBuilder(type));
7878
}
7979

80-
public static <T> ParquetValueWriter<T> buildWriter(
81-
StructType dfSchema, MessageType type, Schema icebergSchema) {
82-
return (ParquetValueWriter<T>)
83-
ParquetWithSparkSchemaVisitor.visit(
84-
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
85-
type,
86-
new WriteBuilder(type));
87-
}
88-
8980
private static class WriteBuilder extends ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
9081
private final MessageType type;
9182

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
6060
private StructType positionDeleteSparkType;
6161
private final Schema positionDeleteRowSchema;
6262
private final Table table;
63-
private final FileFormat format;
63+
private final FileFormat deleteFormat;
6464
private final Map<String, String> writeProperties;
6565

6666
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
100100
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
101101

102102
this.table = table;
103-
this.format = dataFileFormat;
103+
this.deleteFormat = deleteFileFormat;
104104
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
105105
this.positionDeleteRowSchema = positionDeleteRowSchema;
106106
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
138138
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
139139

140140
this.table = table;
141-
this.format = dataFileFormat;
141+
this.deleteFormat = deleteFileFormat;
142142
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
143143
this.positionDeleteRowSchema = null;
144144
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
172172
: MetricsConfig.forPositionDelete(table);
173173

174174
try {
175-
return switch (format) {
175+
return switch (deleteFormat) {
176176
case AVRO ->
177177
Avro.writeDeletes(file)
178178
.createWriterFunc(
@@ -215,14 +215,13 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
215215
.metricsConfig(metricsConfig)
216216
.withPartition(partition)
217217
.overwrite()
218-
.metricsConfig(metricsConfig)
219218
.rowSchema(positionDeleteRowSchema)
220219
.withSpec(spec)
221220
.withKeyMetadata(file.keyMetadata())
222221
.buildPositionWriter();
223222
default ->
224223
throw new UnsupportedOperationException(
225-
"Cannot write pos-deletes for unsupported file format: " + format);
224+
"Cannot write pos-deletes for unsupported file format: " + deleteFormat);
226225
};
227226
} catch (IOException e) {
228227
throw new UncheckedIOException("Failed to create new position delete writer", e);

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,6 @@ public static <T> ParquetValueWriter<T> buildWriter(
9191
new WriteBuilder(type));
9292
}
9393

94-
public static <T> ParquetValueWriter<T> buildWriter(
95-
StructType dfSchema, MessageType type, Schema icebergSchema) {
96-
return (ParquetValueWriter<T>)
97-
ParquetWithSparkSchemaVisitor.visit(
98-
dfSchema != null ? dfSchema : SparkSchemaUtil.convert(icebergSchema),
99-
type,
100-
new WriteBuilder(type));
101-
}
102-
10394
private static class WriteBuilder extends ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
10495
private final MessageType type;
10596

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
6060
private StructType positionDeleteSparkType;
6161
private final Schema positionDeleteRowSchema;
6262
private final Table table;
63-
private final FileFormat format;
63+
private final FileFormat deleteFormat;
6464
private final Map<String, String> writeProperties;
6565

6666
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
100100
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
101101

102102
this.table = table;
103-
this.format = dataFileFormat;
103+
this.deleteFormat = deleteFileFormat;
104104
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
105105
this.positionDeleteRowSchema = positionDeleteRowSchema;
106106
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends RegistryBasedFileWriterFactory<InternalRow,
138138
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
139139

140140
this.table = table;
141-
this.format = dataFileFormat;
141+
this.deleteFormat = deleteFileFormat;
142142
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
143143
this.positionDeleteRowSchema = null;
144144
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
172172
: MetricsConfig.forPositionDelete(table);
173173

174174
try {
175-
return switch (format) {
175+
return switch (deleteFormat) {
176176
case AVRO ->
177177
Avro.writeDeletes(file)
178178
.createWriterFunc(
@@ -215,14 +215,13 @@ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
215215
.metricsConfig(metricsConfig)
216216
.withPartition(partition)
217217
.overwrite()
218-
.metricsConfig(metricsConfig)
219218
.rowSchema(positionDeleteRowSchema)
220219
.withSpec(spec)
221220
.withKeyMetadata(file.keyMetadata())
222221
.buildPositionWriter();
223222
default ->
224223
throw new UnsupportedOperationException(
225-
"Cannot write pos-deletes for unsupported file format: " + format);
224+
"Cannot write pos-deletes for unsupported file format: " + deleteFormat);
226225
};
227226
} catch (IOException e) {
228227
throw new UncheckedIOException("Failed to create new position delete writer", e);

0 commit comments

Comments
 (0)