Skip to content

Commit eeb9191

Browse files
committed
Ryan's comments
1 parent 83eba9e commit eeb9191

6 files changed

Lines changed: 72 additions & 65 deletions

File tree

core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public static <T> PositionDelete<T> create() {
3131

3232
private PositionDelete() {}
3333

34+
@SuppressWarnings("unchecked")
35+
public static <T> Class<PositionDelete<T>> deleteClass() {
36+
return (Class<PositionDelete<T>>) (Class<?>) PositionDelete.class;
37+
}
38+
3439
public PositionDelete<R> set(CharSequence newPath, long newPos) {
3540
this.path = newPath;
3641
this.pos = newPos;

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,10 @@ public static <D, S> FileWriterBuilder<EqualityDeleteWriter<D>, S> equalityDelet
170170
* @param outputFile destination for the written data
171171
* @return a configured delete write builder for creating a {@link PositionDeleteWriter}
172172
*/
173-
@SuppressWarnings("unchecked")
174173
public static <D> FileWriterBuilder<PositionDeleteWriter<D>, ?> positionDeleteWriteBuilder(
175174
FileFormat format, EncryptedOutputFile outputFile) {
176-
Class<PositionDelete<D>> deleteClass =
177-
(Class<PositionDelete<D>>) (Class<?>) PositionDelete.class;
178-
FormatModel<PositionDelete<D>, ?> model = FormatModelRegistry.modelFor(format, deleteClass);
175+
FormatModel<PositionDelete<D>, ?> model =
176+
FormatModelRegistry.modelFor(format, PositionDelete.deleteClass());
179177
return FileWriterBuilderImpl.forPositionDelete(model, outputFile);
180178
}
181179

data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.iceberg.data;
2020

21-
import org.apache.iceberg.Schema;
2221
import org.apache.iceberg.data.parquet.GenericParquetReaders;
2322
import org.apache.iceberg.data.parquet.GenericParquetWriter;
2423
import org.apache.iceberg.formats.FormatModelRegistry;
@@ -29,13 +28,13 @@ public static void register() {
2928
FormatModelRegistry.register(
3029
ParquetFormatModel.create(
3130
Record.class,
32-
Schema.class,
31+
Void.class,
3332
(icebergSchema, fileSchema, engineSchema) ->
3433
GenericParquetWriter.create(icebergSchema, fileSchema),
3534
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
3635
GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
3736

38-
FormatModelRegistry.register(ParquetFormatModel.forDelete());
37+
FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
3938
}
4039

4140
private GenericFormatModels() {}

data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@
3131
public class DataTestHelpers {
3232
private DataTestHelpers() {}
3333

34+
public static void assertEquals(
35+
Types.StructType struct, List<Record> expected, List<Record> actual) {
36+
assertThat(actual).hasSize(expected.size());
37+
for (int i = 0; i < expected.size(); i += 1) {
38+
assertEquals(struct, expected.get(i), actual.get(i), null, -1);
39+
}
40+
}
41+
3442
public static void assertEquals(Types.StructType struct, Record expected, Record actual) {
3543
assertEquals(struct, expected, actual, null, -1);
3644
}

data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java

Lines changed: 33 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import org.apache.iceberg.DataFile;
2929
import org.apache.iceberg.DeleteFile;
3030
import org.apache.iceberg.FileFormat;
31-
import org.apache.iceberg.Parameter;
32-
import org.apache.iceberg.ParameterizedTestExtension;
33-
import org.apache.iceberg.Parameters;
3431
import org.apache.iceberg.PartitionSpec;
3532
import org.apache.iceberg.Schema;
3633
import org.apache.iceberg.TestBase;
@@ -46,28 +43,18 @@
4643
import org.apache.iceberg.io.CloseableIterable;
4744
import org.apache.iceberg.io.DataWriter;
4845
import org.apache.iceberg.io.InputFile;
49-
import org.apache.iceberg.io.OutputFile;
5046
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5147
import org.junit.jupiter.api.AfterEach;
5248
import org.junit.jupiter.api.BeforeEach;
53-
import org.junit.jupiter.api.TestTemplate;
54-
import org.junit.jupiter.api.extension.ExtendWith;
5549
import org.junit.jupiter.api.io.TempDir;
50+
import org.junit.jupiter.params.ParameterizedTest;
51+
import org.junit.jupiter.params.provider.FieldSource;
5652

57-
@ExtendWith(ParameterizedTestExtension.class)
5853
public class TestGenericFormatModels {
59-
@Parameters(name = "fileFormat = {0}")
60-
protected static List<Object> parameters() {
61-
return List.of(FileFormat.PARQUET);
62-
}
63-
6454
private static final List<Record> TEST_RECORDS =
65-
ImmutableList.of(
66-
GenericRecord.create(TestBase.SCHEMA).copy("id", 1, "data", "hello"),
67-
GenericRecord.create(TestBase.SCHEMA).copy("id", 2, "data", "world"));
55+
RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
6856

69-
@Parameter(index = 0)
70-
private FileFormat fileFormat;
57+
private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.PARQUET};
7158

7259
@TempDir protected Path temp;
7360

@@ -77,8 +64,9 @@ protected static List<Object> parameters() {
7764
@BeforeEach
7865
public void before() {
7966
this.fileIO = new InMemoryFileIO();
80-
OutputFile outputFile = fileIO.newOutputFile("test-file." + fileFormat.name().toLowerCase());
81-
this.encryptedFile = EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY);
67+
this.encryptedFile =
68+
EncryptedFiles.encryptedOutput(
69+
fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
8270
}
8371

8472
@AfterEach
@@ -90,8 +78,9 @@ public void after() throws IOException {
9078
}
9179
}
9280

93-
@TestTemplate
94-
public void testDataWriter() throws IOException {
81+
@ParameterizedTest
82+
@FieldSource("FILE_FORMATS")
83+
public void testDataWriter(FileFormat fileFormat) throws IOException {
9584
FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
9685
FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile);
9786

@@ -107,11 +96,11 @@ public void testDataWriter() throws IOException {
10796
dataFile = writer.toDataFile();
10897

10998
assertThat(dataFile).isNotNull();
110-
assertThat(dataFile.recordCount()).isEqualTo(2);
99+
assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size());
111100
assertThat(dataFile.format()).isEqualTo(fileFormat);
112101

113102
// Verify the file content by reading it back
114-
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
103+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
115104
List<Record> readRecords;
116105
try (CloseableIterable<Record> reader =
117106
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
@@ -120,15 +109,12 @@ public void testDataWriter() throws IOException {
120109
readRecords = ImmutableList.copyOf(reader);
121110
}
122111

123-
assertThat(readRecords).hasSize(2);
124-
DataTestHelpers.assertEquals(
125-
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0));
126-
DataTestHelpers.assertEquals(
127-
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1));
112+
DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords);
128113
}
129114

130-
@TestTemplate
131-
public void testEqualityDeleteWriter() throws IOException {
115+
@ParameterizedTest
116+
@FieldSource("FILE_FORMATS")
117+
public void testEqualityDeleteWriter(FileFormat fileFormat) throws IOException {
132118
FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
133119
FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile);
134120

@@ -148,12 +134,12 @@ public void testEqualityDeleteWriter() throws IOException {
148134
deleteFile = writer.toDeleteFile();
149135

150136
assertThat(deleteFile).isNotNull();
151-
assertThat(deleteFile.recordCount()).isEqualTo(2);
137+
assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size());
152138
assertThat(deleteFile.format()).isEqualTo(fileFormat);
153139
assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
154140

155141
// Verify the file content by reading it back
156-
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
142+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
157143
List<Record> readRecords;
158144
try (CloseableIterable<Record> reader =
159145
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
@@ -162,25 +148,22 @@ public void testEqualityDeleteWriter() throws IOException {
162148
readRecords = ImmutableList.copyOf(reader);
163149
}
164150

165-
assertThat(readRecords).hasSize(2);
166-
DataTestHelpers.assertEquals(
167-
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0));
168-
DataTestHelpers.assertEquals(
169-
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1));
151+
DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, readRecords);
170152
}
171153

172-
@TestTemplate
173-
public void testPositionDeleteWriter() throws IOException {
154+
@ParameterizedTest
155+
@FieldSource("FILE_FORMATS")
156+
public void testPositionDeleteWriter(FileFormat fileFormat) throws IOException {
174157
Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS);
175158

176159
FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
177160
FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile);
178161

179162
PositionDelete<Record> delete1 = PositionDelete.create();
180-
delete1.set("data-file-1.parquet", 0L, null);
163+
delete1.set("data-file-1.parquet", 0L);
181164

182165
PositionDelete<Record> delete2 = PositionDelete.create();
183-
delete2.set("data-file-1.parquet", 1L, null);
166+
delete2.set("data-file-1.parquet", 1L);
184167

185168
List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1, delete2);
186169

@@ -199,7 +182,7 @@ public void testPositionDeleteWriter() throws IOException {
199182
assertThat(deleteFile.format()).isEqualTo(fileFormat);
200183

201184
// Verify the file content by reading it back
202-
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
185+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
203186
List<Record> readRecords;
204187
try (CloseableIterable<Record> reader =
205188
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
@@ -208,10 +191,13 @@ public void testPositionDeleteWriter() throws IOException {
208191
readRecords = ImmutableList.copyOf(reader);
209192
}
210193

211-
assertThat(readRecords).hasSize(2);
212-
assertThat(readRecords.get(0).getField("file_path")).isEqualTo("data-file-1.parquet");
213-
assertThat(readRecords.get(0).getField("pos")).isEqualTo(0L);
214-
assertThat(readRecords.get(1).getField("file_path")).isEqualTo("data-file-1.parquet");
215-
assertThat(readRecords.get(1).getField("pos")).isEqualTo(1L);
194+
List<Record> expected =
195+
ImmutableList.of(
196+
GenericRecord.create(positionDeleteSchema)
197+
.copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 0L),
198+
GenericRecord.create(positionDeleteSchema)
199+
.copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", DELETE_FILE_POS.name(), 1L));
200+
201+
DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected, readRecords);
216202
}
217203
}

parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,18 @@
3838
import org.apache.iceberg.io.FileAppender;
3939
import org.apache.iceberg.io.InputFile;
4040
import org.apache.iceberg.mapping.NameMapping;
41+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4142
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4243
import org.apache.parquet.column.ParquetProperties;
4344
import org.apache.parquet.schema.MessageType;
4445

4546
public class ParquetFormatModel<D, S, R>
4647
extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
4748
public static final String WRITER_VERSION_KEY = "parquet.writer.version";
48-
private final boolean batchReader;
49+
private final boolean isBatchReader;
4950

50-
@SuppressWarnings("rawtypes")
51-
public static ParquetFormatModel<PositionDelete, Object, Object> forDelete() {
52-
return new ParquetFormatModel<>(PositionDelete.class, null, null, null, false);
51+
public static <D> ParquetFormatModel<PositionDelete<D>, Object, Object> forPositionDeletes() {
52+
return new ParquetFormatModel<>(PositionDelete.deleteClass(), null, null, null, false);
5353
}
5454

5555
public static <D, S> ParquetFormatModel<D, S, ParquetValueReader<?>> create(
@@ -72,9 +72,9 @@ private ParquetFormatModel(
7272
Class<S> schemaType,
7373
WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
7474
ReaderFunction<R, S, MessageType> readerFunction,
75-
boolean batchReader) {
75+
boolean isBatchReader) {
7676
super(type, schemaType, writerFunction, readerFunction);
77-
this.batchReader = batchReader;
77+
this.isBatchReader = isBatchReader;
7878
}
7979

8080
@Override
@@ -89,12 +89,13 @@ public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
8989

9090
@Override
9191
public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
92-
return new ReadBuilderWrapper<>(inputFile, readerFunction(), batchReader);
92+
return new ReadBuilderWrapper<>(inputFile, readerFunction(), isBatchReader);
9393
}
9494

9595
private static class WriteBuilderWrapper<D, S> implements ModelWriteBuilder<D, S> {
9696
private final Parquet.WriteBuilder internal;
9797
private final WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction;
98+
private Schema schema;
9899
private S engineSchema;
99100
private FileContent content;
100101

@@ -106,14 +107,15 @@ private WriteBuilderWrapper(
106107
}
107108

108109
@Override
109-
public ModelWriteBuilder<D, S> schema(Schema schema) {
110-
internal.schema(schema);
110+
public ModelWriteBuilder<D, S> schema(Schema newSchema) {
111+
this.schema = newSchema;
112+
internal.schema(newSchema);
111113
return this;
112114
}
113115

114116
@Override
115-
public ModelWriteBuilder<D, S> engineSchema(S schema) {
116-
this.engineSchema = schema;
117+
public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
118+
this.engineSchema = newSchema;
117119
return this;
118120
}
119121

@@ -191,6 +193,15 @@ public FileAppender<D> build() throws IOException {
191193
writerFunction.write(icebergSchema, messageType, engineSchema));
192194
break;
193195
case POSITION_DELETES:
196+
Preconditions.checkState(
197+
schema == null,
198+
"Invalid schema: %s. Position deletes with schema are not supported by the API.",
199+
schema);
200+
Preconditions.checkState(
201+
engineSchema == null,
202+
"Invalid engineSchema: %s. Position deletes with schema are not supported by the API.",
203+
engineSchema);
204+
194205
internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext);
195206
internal.createWriterFunc(
196207
(icebergSchema, messageType) ->

0 commit comments

Comments
 (0)