Skip to content

Commit cb9377a

Browse files
committed
Core, Data, Flink: Moving Flink to use the new FormatModel API
1 parent 8c2ca1d commit cb9377a

9 files changed

Lines changed: 399 additions & 203 deletions

File tree

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ private FormatModelRegistry() {}
5555
private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class);
5656
// The list of classes which are used for registering the reader and writer builders
5757
private static final List<String> CLASSES_TO_REGISTER =
58-
ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
58+
ImmutableList.of(
59+
"org.apache.iceberg.data.GenericFormatModels",
60+
"org.apache.iceberg.flink.data.FlinkFormatModels");
5961

6062
// Format models indexed by file format and object model class
6163
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.data;
20+
21+
import java.io.IOException;
22+
import java.io.Serializable;
23+
import java.io.UncheckedIOException;
24+
import java.util.Map;
25+
import org.apache.iceberg.FileFormat;
26+
import org.apache.iceberg.MetricsConfig;
27+
import org.apache.iceberg.PartitionSpec;
28+
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.SortOrder;
30+
import org.apache.iceberg.StructLike;
31+
import org.apache.iceberg.Table;
32+
import org.apache.iceberg.deletes.EqualityDeleteWriter;
33+
import org.apache.iceberg.deletes.PositionDeleteWriter;
34+
import org.apache.iceberg.encryption.EncryptedOutputFile;
35+
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
36+
import org.apache.iceberg.formats.FileWriterBuilder;
37+
import org.apache.iceberg.formats.FormatModelRegistry;
38+
import org.apache.iceberg.io.DataWriter;
39+
import org.apache.iceberg.io.FileWriterFactory;
40+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
41+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
42+
43+
/**
44+
* A base writer factory to be extended by query engine integrations.
45+
*
46+
* @param <T> row type
47+
*/
48+
public abstract class RegistryBasedFileWriterFactory<T, S>
49+
implements FileWriterFactory<T>, Serializable {
50+
private final Table table;
51+
private final FileFormat dataFileFormat;
52+
private final Class<T> inputType;
53+
private final Schema dataSchema;
54+
private final SortOrder dataSortOrder;
55+
private final FileFormat deleteFileFormat;
56+
private final int[] equalityFieldIds;
57+
private final Schema equalityDeleteRowSchema;
58+
private final SortOrder equalityDeleteSortOrder;
59+
private final Map<String, String> writerProperties;
60+
private final S inputSchema;
61+
private final S equalityDeleteInputSchema;
62+
63+
protected RegistryBasedFileWriterFactory(
64+
Table table,
65+
FileFormat dataFileFormat,
66+
Class<T> inputType,
67+
Schema dataSchema,
68+
SortOrder dataSortOrder,
69+
FileFormat deleteFileFormat,
70+
int[] equalityFieldIds,
71+
Schema equalityDeleteRowSchema,
72+
SortOrder equalityDeleteSortOrder,
73+
Map<String, String> writerProperties,
74+
S inputSchema,
75+
S equalityDeleteInputSchema) {
76+
this.table = table;
77+
this.dataFileFormat = dataFileFormat;
78+
this.inputType = inputType;
79+
this.dataSchema = dataSchema;
80+
this.dataSortOrder = dataSortOrder;
81+
this.deleteFileFormat = deleteFileFormat;
82+
this.equalityFieldIds = equalityFieldIds;
83+
this.equalityDeleteRowSchema = equalityDeleteRowSchema;
84+
this.equalityDeleteSortOrder = equalityDeleteSortOrder;
85+
this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of();
86+
this.inputSchema = inputSchema;
87+
this.equalityDeleteInputSchema = equalityDeleteInputSchema;
88+
}
89+
90+
protected S inputSchema() {
91+
return inputSchema;
92+
}
93+
94+
protected S equalityDeleteInputSchema() {
95+
return equalityDeleteInputSchema;
96+
}
97+
98+
@Override
99+
public DataWriter<T> newDataWriter(
100+
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
101+
Preconditions.checkNotNull(dataSchema, "Data schema must not be null");
102+
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
103+
Map<String, String> properties = table != null ? table.properties() : ImmutableMap.of();
104+
MetricsConfig metricsConfig =
105+
table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault();
106+
107+
try {
108+
FileWriterBuilder<DataWriter<T>, S> builder =
109+
FormatModelRegistry.dataWriteBuilder(dataFileFormat, inputType, file);
110+
return builder
111+
.schema(dataSchema)
112+
.engineSchema(inputSchema())
113+
.setAll(properties)
114+
.setAll(writerProperties)
115+
.metricsConfig(metricsConfig)
116+
.spec(spec)
117+
.partition(partition)
118+
.keyMetadata(keyMetadata)
119+
.sortOrder(dataSortOrder)
120+
.overwrite()
121+
.build();
122+
} catch (IOException e) {
123+
throw new UncheckedIOException("Failed to create new data writer", e);
124+
}
125+
}
126+
127+
@Override
128+
public EqualityDeleteWriter<T> newEqualityDeleteWriter(
129+
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
130+
Preconditions.checkNotNull(equalityDeleteRowSchema, "Equality delete schema must not be null");
131+
132+
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
133+
Map<String, String> properties = table != null ? table.properties() : ImmutableMap.of();
134+
MetricsConfig metricsConfig =
135+
table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault();
136+
137+
try {
138+
FileWriterBuilder<EqualityDeleteWriter<T>, S> builder =
139+
FormatModelRegistry.equalityDeleteWriteBuilder(deleteFileFormat, inputType, file);
140+
return builder
141+
.setAll(properties)
142+
.setAll(writerProperties)
143+
.metricsConfig(metricsConfig)
144+
.schema(equalityDeleteRowSchema)
145+
.engineSchema(equalityDeleteInputSchema())
146+
.equalityFieldIds(equalityFieldIds)
147+
.spec(spec)
148+
.partition(partition)
149+
.keyMetadata(keyMetadata)
150+
.sortOrder(equalityDeleteSortOrder)
151+
.overwrite()
152+
.build();
153+
} catch (IOException e) {
154+
throw new UncheckedIOException("Failed to create new equality delete writer", e);
155+
}
156+
}
157+
158+
@Override
159+
public PositionDeleteWriter<T> newPositionDeleteWriter(
160+
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
161+
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
162+
Map<String, String> properties = table != null ? table.properties() : ImmutableMap.of();
163+
MetricsConfig metricsConfig =
164+
table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.forPositionDelete();
165+
166+
try {
167+
FileWriterBuilder<PositionDeleteWriter<T>, ?> builder =
168+
FormatModelRegistry.positionDeleteWriteBuilder(deleteFileFormat, file);
169+
return builder
170+
.setAll(properties)
171+
.setAll(writerProperties)
172+
.metricsConfig(metricsConfig)
173+
.spec(spec)
174+
.partition(partition)
175+
.keyMetadata(keyMetadata)
176+
.overwrite()
177+
.build();
178+
} catch (IOException e) {
179+
throw new UncheckedIOException("Failed to create new position delete writer", e);
180+
}
181+
}
182+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.data;
20+
21+
import org.apache.flink.table.data.RowData;
22+
import org.apache.flink.table.types.logical.RowType;
23+
import org.apache.iceberg.avro.AvroFormatModel;
24+
import org.apache.iceberg.formats.FormatModelRegistry;
25+
import org.apache.iceberg.orc.ORCFormatModel;
26+
import org.apache.iceberg.parquet.ParquetFormatModel;
27+
28+
public class FlinkFormatModels {
29+
public static void register() {
30+
FormatModelRegistry.register(
31+
ParquetFormatModel.create(
32+
RowData.class,
33+
RowType.class,
34+
(icebergSchema, fileSchema, engineSchema) ->
35+
FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
36+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
37+
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
38+
39+
FormatModelRegistry.register(
40+
AvroFormatModel.create(
41+
RowData.class,
42+
RowType.class,
43+
(icebergSchema, fileSchema, engineSchema) -> new FlinkAvroWriter(engineSchema),
44+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
45+
FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
46+
47+
FormatModelRegistry.register(
48+
ORCFormatModel.create(
49+
RowData.class,
50+
RowType.class,
51+
(icebergSchema, fileSchema, engineSchema) ->
52+
FlinkOrcWriter.buildWriter(engineSchema, icebergSchema),
53+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
54+
new FlinkOrcReader(icebergSchema, fileSchema, idToConstant)));
55+
}
56+
57+
private FlinkFormatModels() {}
58+
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.flink.data;
2020

2121
import java.util.List;
22+
import org.apache.flink.annotation.Internal;
2223
import org.apache.flink.table.types.logical.ArrayType;
2324
import org.apache.flink.table.types.logical.LogicalType;
2425
import org.apache.flink.table.types.logical.MapType;
@@ -29,9 +30,10 @@
2930
import org.apache.iceberg.types.Type;
3031
import org.apache.iceberg.types.Types;
3132

33+
@Internal
3234
abstract class FlinkSchemaVisitor<T> {
3335

34-
static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> visitor) {
36+
public static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> visitor) {
3537
return visit(flinkType, schema.asStruct(), visitor);
3638
}
3739

@@ -94,24 +96,29 @@ private static <T> T visitRecord(
9496
List<LogicalType> fieldTypes = Lists.newArrayListWithExpectedSize(fieldSize);
9597
List<Types.NestedField> nestedFields = struct.fields();
9698

97-
for (int i = 0; i < fieldSize; i++) {
98-
Types.NestedField iField = nestedFields.get(i);
99-
int fieldIndex = rowType.getFieldIndex(iField.name());
100-
Preconditions.checkArgument(
101-
fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType);
99+
visitor.beforeStruct(struct.asStructType());
102100

103-
LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex);
101+
try {
102+
for (int i = 0; i < fieldSize; i++) {
103+
Types.NestedField iField = nestedFields.get(i);
104+
int fieldIndex = rowType.getFieldIndex(iField.name());
105+
Preconditions.checkArgument(
106+
fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType);
104107

105-
fieldTypes.add(fieldFlinkType);
108+
LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex);
109+
fieldTypes.add(fieldFlinkType);
106110

107-
visitor.beforeField(iField);
108-
try {
109-
if (iField.type() != Types.UnknownType.get()) {
110-
results.add(visit(fieldFlinkType, iField.type(), visitor));
111+
visitor.beforeField(iField);
112+
try {
113+
if (iField.type() != Types.UnknownType.get()) {
114+
results.add(visit(fieldFlinkType, iField.type(), visitor));
115+
}
116+
} finally {
117+
visitor.afterField(iField);
111118
}
112-
} finally {
113-
visitor.afterField(iField);
114119
}
120+
} finally {
121+
visitor.afterStruct(struct.asStructType());
115122
}
116123

117124
return visitor.record(struct, results, fieldTypes);
@@ -137,6 +144,10 @@ public void beforeField(Types.NestedField field) {}
137144

138145
public void afterField(Types.NestedField field) {}
139146

147+
public void beforeStruct(Types.StructType type) {}
148+
149+
public void afterStruct(Types.StructType type) {}
150+
140151
public void beforeListElement(Types.NestedField elementField) {
141152
beforeField(elementField);
142153
}

0 commit comments

Comments
 (0)