Skip to content

Commit bfcb979

Browse files
authored
Data: Moving GenericFileWriterFactory to the new FormatModel API (#15334)
1 parent c7ed71e commit bfcb979

3 files changed

Lines changed: 332 additions & 22 deletions

File tree

data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@
4040
import org.apache.iceberg.parquet.Parquet;
4141
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4242

43-
/** A base writer factory to be extended by query engine integrations. */
43+
/**
44+
* A base writer factory to be extended by query engine integrations.
45+
*
46+
* @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link
47+
* RegistryBasedFileWriterFactory}
48+
*/
49+
@Deprecated
4450
public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T>, Serializable {
4551
private final Table table;
4652
private final FileFormat dataFileFormat;
@@ -75,13 +81,6 @@ protected BaseFileWriterFactory(
7581
this.positionDeleteRowSchema = null;
7682
}
7783

78-
/**
79-
* @deprecated This constructor is deprecated as of version 1.11.0 and will be removed in 1.12.0.
80-
* Position deletes that include row data are no longer supported. Use {@link
81-
* #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, FileFormat, int[], Schema,
82-
* SortOrder, Map)} instead.
83-
*/
84-
@Deprecated
8584
protected BaseFileWriterFactory(
8685
Table table,
8786
FileFormat dataFileFormat,

data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java

Lines changed: 143 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,37 @@
2222
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
2323
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
2424

25+
import java.io.IOException;
26+
import java.io.UncheckedIOException;
2527
import java.util.Map;
2628
import org.apache.iceberg.FileFormat;
29+
import org.apache.iceberg.MetricsConfig;
30+
import org.apache.iceberg.PartitionSpec;
2731
import org.apache.iceberg.Schema;
2832
import org.apache.iceberg.SortOrder;
33+
import org.apache.iceberg.StructLike;
2934
import org.apache.iceberg.Table;
3035
import org.apache.iceberg.avro.Avro;
3136
import org.apache.iceberg.data.avro.DataWriter;
3237
import org.apache.iceberg.data.orc.GenericOrcWriter;
3338
import org.apache.iceberg.data.parquet.GenericParquetWriter;
39+
import org.apache.iceberg.deletes.PositionDeleteWriter;
40+
import org.apache.iceberg.encryption.EncryptedOutputFile;
41+
import org.apache.iceberg.formats.FormatModelRegistry;
3442
import org.apache.iceberg.orc.ORC;
3543
import org.apache.iceberg.parquet.Parquet;
3644
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3745
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
3848

39-
public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
49+
public class GenericFileWriterFactory extends RegistryBasedFileWriterFactory<Record, Schema> {
50+
private static final Logger LOG = LoggerFactory.getLogger(GenericFileWriterFactory.class);
51+
52+
private Table table;
53+
private FileFormat format;
54+
private Schema positionDeleteRowSchema;
55+
private Map<String, String> writerProperties;
4056

4157
GenericFileWriterFactory(
4258
Table table,
@@ -50,13 +66,16 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
5066
super(
5167
table,
5268
dataFileFormat,
69+
Record.class,
5370
dataSchema,
5471
dataSortOrder,
5572
deleteFileFormat,
5673
equalityFieldIds,
5774
equalityDeleteRowSchema,
5875
equalityDeleteSortOrder,
59-
ImmutableMap.of());
76+
ImmutableMap.of(),
77+
null,
78+
null);
6079
}
6180

6281
/**
@@ -80,14 +99,20 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
8099
super(
81100
table,
82101
dataFileFormat,
102+
Record.class,
83103
dataSchema,
84104
dataSortOrder,
85105
deleteFileFormat,
86106
equalityFieldIds,
87107
equalityDeleteRowSchema,
88108
equalityDeleteSortOrder,
89-
positionDeleteRowSchema,
90-
writerProperties);
109+
writerProperties,
110+
null,
111+
null);
112+
this.table = table;
113+
this.format = dataFileFormat;
114+
this.positionDeleteRowSchema = positionDeleteRowSchema;
115+
this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of();
91116
}
92117

93118
/**
@@ -107,64 +132,168 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
107132
super(
108133
table,
109134
dataFileFormat,
135+
Record.class,
110136
dataSchema,
111137
dataSortOrder,
112138
deleteFileFormat,
113139
equalityFieldIds,
114140
equalityDeleteRowSchema,
115141
equalityDeleteSortOrder,
116-
positionDeleteRowSchema);
142+
ImmutableMap.of(),
143+
dataSchema,
144+
equalityDeleteRowSchema);
145+
this.table = table;
146+
this.format = dataFileFormat;
147+
this.positionDeleteRowSchema = positionDeleteRowSchema;
117148
}
118149

119150
static Builder builderFor(Table table) {
120151
return new Builder(table);
121152
}
122153

123-
@Override
154+
/**
155+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
156+
* the configuration is done by the {@link FormatModelRegistry}.
157+
*/
158+
@Deprecated
124159
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
125160
builder.createWriterFunc(DataWriter::create);
126161
}
127162

128-
@Override
163+
/**
164+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
165+
* the configuration is done by the {@link FormatModelRegistry}.
166+
*/
167+
@Deprecated
129168
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
130169
builder.createWriterFunc(DataWriter::create);
131170
}
132171

133-
@Override
172+
/**
173+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
174+
* the configuration is done by the {@link FormatModelRegistry}.
175+
*/
176+
@Deprecated
134177
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
135178
builder.createWriterFunc(DataWriter::create);
136179
}
137180

138-
@Override
181+
/**
182+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
183+
* the configuration is done by the {@link FormatModelRegistry}.
184+
*/
185+
@Deprecated
139186
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
140187
builder.createWriterFunc(GenericParquetWriter::create);
141188
}
142189

143-
@Override
190+
/**
191+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
192+
* the configuration is done by the {@link FormatModelRegistry}.
193+
*/
194+
@Deprecated
144195
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
145196
builder.createWriterFunc(GenericParquetWriter::create);
146197
}
147198

148-
@Override
199+
/**
200+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
201+
* the configuration is done by the {@link FormatModelRegistry}.
202+
*/
203+
@Deprecated
149204
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
150205
builder.createWriterFunc(GenericParquetWriter::create);
151206
}
152207

153-
@Override
208+
/**
209+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
210+
* the configuration is done by the {@link FormatModelRegistry}.
211+
*/
212+
@Deprecated
154213
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
155214
builder.createWriterFunc(GenericOrcWriter::buildWriter);
156215
}
157216

158-
@Override
217+
/**
218+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
219+
* the configuration is done by the {@link FormatModelRegistry}.
220+
*/
221+
@Deprecated
159222
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
160223
builder.createWriterFunc(GenericOrcWriter::buildWriter);
161224
}
162225

163-
@Override
226+
/**
227+
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
228+
* the configuration is done by the {@link FormatModelRegistry}.
229+
*/
230+
@Deprecated
164231
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
165232
builder.createWriterFunc(GenericOrcWriter::buildWriter);
166233
}
167234

235+
@Override
236+
public PositionDeleteWriter<Record> newPositionDeleteWriter(
237+
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
238+
if (positionDeleteRowSchema == null) {
239+
return super.newPositionDeleteWriter(file, spec, partition);
240+
} else {
241+
LOG.warn(
242+
"Deprecated feature used. Position delete row schema is used to create the position delete writer.");
243+
Map<String, String> properties = table == null ? ImmutableMap.of() : table.properties();
244+
MetricsConfig metricsConfig =
245+
table == null
246+
? MetricsConfig.forPositionDelete()
247+
: MetricsConfig.forPositionDelete(table);
248+
249+
try {
250+
return switch (format) {
251+
case AVRO ->
252+
Avro.writeDeletes(file)
253+
.setAll(properties)
254+
.setAll(writerProperties)
255+
.metricsConfig(metricsConfig)
256+
.createWriterFunc(DataWriter::create)
257+
.withPartition(partition)
258+
.overwrite()
259+
.rowSchema(positionDeleteRowSchema)
260+
.withSpec(spec)
261+
.withKeyMetadata(file.keyMetadata())
262+
.buildPositionWriter();
263+
case ORC ->
264+
ORC.writeDeletes(file)
265+
.setAll(properties)
266+
.setAll(writerProperties)
267+
.metricsConfig(metricsConfig)
268+
.createWriterFunc(GenericOrcWriter::buildWriter)
269+
.withPartition(partition)
270+
.overwrite()
271+
.rowSchema(positionDeleteRowSchema)
272+
.withSpec(spec)
273+
.withKeyMetadata(file.keyMetadata())
274+
.buildPositionWriter();
275+
case PARQUET ->
276+
Parquet.writeDeletes(file)
277+
.setAll(properties)
278+
.setAll(writerProperties)
279+
.metricsConfig(metricsConfig)
280+
.createWriterFunc(GenericParquetWriter::create)
281+
.withPartition(partition)
282+
.overwrite()
283+
.rowSchema(positionDeleteRowSchema)
284+
.withSpec(spec)
285+
.withKeyMetadata(file.keyMetadata())
286+
.buildPositionWriter();
287+
default ->
288+
throw new UnsupportedOperationException(
289+
"Cannot write pos-deletes for unsupported file format: " + format);
290+
};
291+
} catch (IOException e) {
292+
throw new UncheckedIOException("Failed to create new position delete writer", e);
293+
}
294+
}
295+
}
296+
168297
public static class Builder {
169298
private final Table table;
170299
private FileFormat dataFileFormat;

0 commit comments

Comments
 (0)