Skip to content

Commit 235ab0f

Browse files
authored
Parquet, Data: Implementation of ParquetFormatModel (#15253)
1 parent 113e71a commit 235ab0f

4 files changed

Lines changed: 373 additions & 8 deletions

File tree

data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import org.apache.iceberg.avro.AvroFormatModel;
2222
import org.apache.iceberg.data.avro.DataWriter;
2323
import org.apache.iceberg.data.avro.PlannedDataReader;
24+
import org.apache.iceberg.data.parquet.GenericParquetReaders;
25+
import org.apache.iceberg.data.parquet.GenericParquetWriter;
2426
import org.apache.iceberg.formats.FormatModelRegistry;
27+
import org.apache.iceberg.parquet.ParquetFormatModel;
2528

2629
public class GenericFormatModels {
2730
public static void register() {
@@ -34,6 +37,17 @@ public static void register() {
3437
PlannedDataReader.create(icebergSchema, idToConstant)));
3538

3639
FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());
40+
41+
FormatModelRegistry.register(
42+
ParquetFormatModel.create(
43+
Record.class,
44+
Void.class,
45+
(icebergSchema, fileSchema, engineSchema) ->
46+
GenericParquetWriter.create(icebergSchema, fileSchema),
47+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
48+
GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
49+
50+
FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
3751
}
3852

3953
private GenericFormatModels() {}

data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class TestGenericFormatModels {
5454
private static final List<Record> TEST_RECORDS =
5555
RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
5656

57-
private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO};
57+
private static final FileFormat[] FILE_FORMATS =
58+
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET};
5859

5960
@TempDir protected Path temp;
6061

parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ WriteBuilder withWriterVersion(WriterVersion version) {
302302
}
303303

304304
// supposed to always be a private method used strictly by data and delete write builders
305-
private WriteBuilder createContextFunc(
306-
Function<Map<String, String>, Context> newCreateContextFunc) {
305+
WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
307306
this.createContextFunc = newCreateContextFunc;
308307
return this;
309308
}
@@ -498,7 +497,7 @@ public <D> FileAppender<D> build() throws IOException {
498497
}
499498
}
500499

501-
private static class Context {
500+
static class Context {
502501
private final int rowGroupSize;
503502
private final int pageSize;
504503
private final int pageRowLimit;
@@ -1176,6 +1175,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder {
11761175
private Expression filter = null;
11771176
private ReadSupport<?> readSupport = null;
11781177
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
1178+
private BiFunction<Schema, MessageType, VectorizedReader<?>> batchedReaderFuncWithSchema = null;
11791179
private ReaderFunction readerFunction = null;
11801180
private boolean filterRecords = true;
11811181
private boolean caseSensitive = true;
@@ -1298,6 +1298,9 @@ public ReadBuilder createReaderFunc(
12981298
Preconditions.checkArgument(
12991299
this.batchedReaderFunc == null,
13001300
"Cannot set reader function: batched reader function already set");
1301+
Preconditions.checkArgument(
1302+
this.batchedReaderFuncWithSchema == null,
1303+
"Cannot set reader function: batched reader function with schema already set");
13011304
Preconditions.checkArgument(
13021305
this.readerFunction == null, "Cannot set reader function: reader function already set");
13031306
this.readerFunction = new UnaryReaderFunction(newReaderFunction);
@@ -1309,6 +1312,9 @@ public ReadBuilder createReaderFunc(
13091312
Preconditions.checkArgument(
13101313
this.batchedReaderFunc == null,
13111314
"Cannot set reader function: batched reader function already set");
1315+
Preconditions.checkArgument(
1316+
this.batchedReaderFuncWithSchema == null,
1317+
"Cannot set reader function: batched reader function with schema already set");
13121318
Preconditions.checkArgument(
13131319
this.readerFunction == null, "Cannot set reader function: reader function already set");
13141320
this.readerFunction = new BinaryReaderFunction(newReaderFunction);
@@ -1319,17 +1325,38 @@ public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReade
13191325
Preconditions.checkArgument(
13201326
this.batchedReaderFunc == null,
13211327
"Cannot set batched reader function: batched reader function already set");
1328+
Preconditions.checkArgument(
1329+
this.batchedReaderFuncWithSchema == null,
1330+
"Cannot set reader function: batched reader function with schema already set");
13221331
Preconditions.checkArgument(
13231332
this.readerFunction == null,
13241333
"Cannot set batched reader function: ReaderFunction already set");
13251334
this.batchedReaderFunc = func;
13261335
return this;
13271336
}
13281337

1338+
public ReadBuilder createBatchedReaderFunc(
1339+
BiFunction<Schema, MessageType, VectorizedReader<?>> func) {
1340+
Preconditions.checkArgument(
1341+
this.batchedReaderFunc == null,
1342+
"Cannot set batched reader function: batched reader function already set");
1343+
Preconditions.checkArgument(
1344+
this.batchedReaderFuncWithSchema == null,
1345+
"Cannot set reader function: batched reader function with schema already set");
1346+
Preconditions.checkArgument(
1347+
this.readerFunction == null,
1348+
"Cannot set batched reader function: ReaderFunction already set");
1349+
this.batchedReaderFuncWithSchema = func;
1350+
return this;
1351+
}
1352+
13291353
public ReadBuilder createReaderFunc(ReaderFunction reader) {
13301354
Preconditions.checkArgument(
13311355
this.batchedReaderFunc == null,
13321356
"Cannot set reader function: batched reader function already set");
1357+
Preconditions.checkArgument(
1358+
this.batchedReaderFuncWithSchema == null,
1359+
"Cannot set reader function: batched reader function with schema already set");
13331360
Preconditions.checkArgument(
13341361
this.readerFunction == null, "Cannot set reader function: reader function already set");
13351362
this.readerFunction = reader;
@@ -1389,7 +1416,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
13891416
}
13901417

13911418
@Override
1392-
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
1419+
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"})
13931420
public <D> CloseableIterable<D> build() {
13941421
FileDecryptionProperties fileDecryptionProperties = null;
13951422
if (fileEncryptionKey != null) {
@@ -1404,7 +1431,9 @@ public <D> CloseableIterable<D> build() {
14041431
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
14051432
}
14061433

1407-
if (batchedReaderFunc != null || readerFunction != null) {
1434+
if (batchedReaderFunc != null
1435+
|| batchedReaderFuncWithSchema != null
1436+
|| readerFunction != null) {
14081437
ParquetReadOptions.Builder optionsBuilder;
14091438
if (file instanceof HadoopInputFile) {
14101439
// remove read properties already set that may conflict with this read
@@ -1441,12 +1470,16 @@ public <D> CloseableIterable<D> build() {
14411470
mapping = NameMapping.empty();
14421471
}
14431472

1444-
if (batchedReaderFunc != null) {
1473+
Function<MessageType, VectorizedReader<?>> batchedFunc =
1474+
batchedReaderFuncWithSchema != null
1475+
? messageType -> batchedReaderFuncWithSchema.apply(schema, messageType)
1476+
: batchedReaderFunc;
1477+
if (batchedFunc != null) {
14451478
return new VectorizedParquetReader<>(
14461479
file,
14471480
schema,
14481481
options,
1449-
batchedReaderFunc,
1482+
batchedFunc,
14501483
mapping,
14511484
filter,
14521485
reuseContainers,

0 commit comments

Comments
 (0)