Skip to content

Commit cecf8c3

Browse files
committed
FileWriterBuilderImpl restructuring
1 parent 8a8a67e commit cecf8c3

7 files changed

Lines changed: 217 additions & 162 deletions

File tree

core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
7979
private static class WriteBuilderWrapper<D, S> implements ModelWriteBuilder<D, S> {
8080
private final Avro.WriteBuilder internal;
8181
private final WriterFunction<DatumWriter<D>, S, Schema> writerFunction;
82-
private S inputSchema;
82+
private org.apache.iceberg.Schema schema;
83+
private S engineSchema;
8384
private FileContent content;
8485

8586
private WriteBuilderWrapper(
@@ -89,14 +90,15 @@ private WriteBuilderWrapper(
8990
}
9091

9192
@Override
92-
public ModelWriteBuilder<D, S> schema(org.apache.iceberg.Schema schema) {
93-
internal.schema(schema);
93+
public ModelWriteBuilder<D, S> schema(org.apache.iceberg.Schema newSchema) {
94+
this.schema = newSchema;
95+
internal.schema(newSchema);
9496
return this;
9597
}
9698

9799
@Override
98-
public ModelWriteBuilder<D, S> engineSchema(S schema) {
99-
this.inputSchema = schema;
100+
public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
101+
this.engineSchema = newSchema;
100102
return this;
101103
}
102104

@@ -158,12 +160,12 @@ public org.apache.iceberg.io.FileAppender<D> build() throws IOException {
158160
case DATA:
159161
internal.createContextFunc(Avro.WriteBuilder.Context::dataContext);
160162
internal.createWriterFunc(
161-
avroSchema -> writerFunction.write(null, avroSchema, inputSchema));
163+
avroSchema -> writerFunction.write(schema, avroSchema, engineSchema));
162164
break;
163165
case EQUALITY_DELETES:
164166
internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
165167
internal.createWriterFunc(
166-
avroSchema -> writerFunction.write(null, avroSchema, inputSchema));
168+
avroSchema -> writerFunction.write(schema, avroSchema, engineSchema));
167169
break;
168170
case POSITION_DELETES:
169171
internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
@@ -253,6 +255,8 @@ public ReadBuilder<D, S> withNameMapping(org.apache.iceberg.mapping.NameMapping
253255

254256
@Override
255257
public CloseableIterable<D> build() {
258+
// The file schema is passed directly to the DatumReader by the Avro read path, so null is
259+
// passed here
256260
return internal
257261
.createResolvingReader(
258262
icebergSchema -> readerFunction.read(icebergSchema, null, engineSchema, idToConstant))

0 commit comments

Comments
 (0)