Skip to content

Commit 00df493

Browse files
authored
Core, Spark 4.1: Fix querying equality deletes with schema evolution (#15268)
1 parent d95d9f0 commit 00df493

32 files changed

Lines changed: 305 additions & 86 deletions

File tree

api/src/main/java/org/apache/iceberg/Schema.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.Comparator;
2526
import java.util.Deque;
2627
import java.util.List;
2728
import java.util.Locale;
@@ -35,6 +36,7 @@
3536
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
3637
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3738
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
39+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
3840
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3941
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4042
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -629,4 +631,33 @@ public static void checkCompatibility(Schema schema, int formatVersion) {
629631
formatVersion, Joiner.on("\n- ").join(problems.values())));
630632
}
631633
}
634+
635+
/**
636+
* Indexes all fields from schemas.
637+
*
638+
* <p>This method favors field definitions from higher schema IDs to handle type promotions.
639+
*
640+
* @param schemas the collection of schemas to index
641+
* @return a map of field IDs to fields
642+
*/
643+
public static Map<Integer, NestedField> indexFields(Collection<Schema> schemas) {
644+
if (schemas.size() == 1) {
645+
Schema schema = Iterables.getOnlyElement(schemas);
646+
return schema.lazyIdToField();
647+
}
648+
649+
Map<Integer, NestedField> fields = Maps.newHashMap();
650+
651+
for (Schema schema : sortAndDeduplicate(schemas)) {
652+
fields.putAll(schema.lazyIdToField());
653+
}
654+
655+
return fields;
656+
}
657+
658+
private static Set<Schema> sortAndDeduplicate(Collection<Schema> schemas) {
659+
Set<Schema> sortedSchemas = Sets.newTreeSet(Comparator.comparingInt(Schema::schemaId));
660+
sortedSchemas.addAll(schemas);
661+
return sortedSchemas;
662+
}
632663
}

api/src/test/java/org/apache/iceberg/TestSchema.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import static org.apache.iceberg.Schema.DEFAULT_VALUES_MIN_FORMAT_VERSION;
2222
import static org.apache.iceberg.Schema.MIN_FORMAT_VERSIONS;
2323
import static org.apache.iceberg.TestHelpers.MAX_FORMAT_VERSION;
24+
import static org.assertj.core.api.Assertions.assertThat;
2425
import static org.assertj.core.api.Assertions.assertThatCode;
2526
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2627

2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.stream.IntStream;
2931
import java.util.stream.Stream;
3032
import org.apache.iceberg.expressions.Literal;
@@ -217,4 +219,97 @@ public void testSupportedWriteDefault(int formatVersion) {
217219
assertThatCode(() -> Schema.checkCompatibility(WRITE_DEFAULT_SCHEMA, formatVersion))
218220
.doesNotThrowAnyException();
219221
}
222+
223+
@Test
224+
public void testIndexFieldsSingleSchema() {
225+
Schema schema =
226+
new Schema(
227+
1,
228+
Types.NestedField.required(1, "id", Types.LongType.get()),
229+
Types.NestedField.optional(2, "data", Types.StringType.get()));
230+
231+
Map<Integer, Types.NestedField> fields = Schema.indexFields(ImmutableList.of(schema));
232+
233+
assertThat(fields).hasSize(2);
234+
assertThat(fields.get(1).name()).isEqualTo("id");
235+
assertThat(fields.get(2).name()).isEqualTo("data");
236+
}
237+
238+
@Test
239+
public void testIndexFieldsHigherSchemaIdTakesPrecedence() {
240+
Schema schema1 =
241+
new Schema(
242+
1,
243+
Types.NestedField.required(1, "id", Types.LongType.get()),
244+
Types.NestedField.optional(2, "data", Types.StringType.get()));
245+
246+
Schema schema2 =
247+
new Schema(
248+
2,
249+
Types.NestedField.required(1, "id", Types.LongType.get()),
250+
Types.NestedField.required(2, "data", Types.IntegerType.get()));
251+
252+
Map<Integer, Types.NestedField> fields = Schema.indexFields(ImmutableList.of(schema2, schema1));
253+
254+
assertThat(fields).hasSize(2);
255+
assertThat(fields.get(2).type()).isEqualTo(Types.IntegerType.get());
256+
assertThat(fields.get(2).isOptional()).isFalse();
257+
}
258+
259+
@Test
260+
public void testIndexFieldsDuplicateSchemaIds() {
261+
Schema schema1 =
262+
new Schema(
263+
1,
264+
Types.NestedField.required(1, "id", Types.LongType.get()),
265+
Types.NestedField.optional(2, "data", Types.StringType.get()));
266+
267+
Schema schema1Duplicate =
268+
new Schema(
269+
1,
270+
Types.NestedField.required(1, "id", Types.LongType.get()),
271+
Types.NestedField.optional(2, "different", Types.IntegerType.get()));
272+
273+
Map<Integer, Types.NestedField> fields =
274+
Schema.indexFields(ImmutableList.of(schema1, schema1Duplicate));
275+
276+
assertThat(fields).hasSize(2);
277+
assertThat(fields.get(2).name()).isEqualTo("data");
278+
}
279+
280+
@Test
281+
public void testIndexFieldsNestedSchema() {
282+
Schema schema1 =
283+
new Schema(
284+
1,
285+
Types.NestedField.required(1, "id", Types.LongType.get()),
286+
Types.NestedField.optional(
287+
2,
288+
"person",
289+
Types.StructType.of(
290+
Types.NestedField.optional(3, "name", Types.StringType.get()),
291+
Types.NestedField.optional(4, "age", Types.IntegerType.get()))));
292+
293+
Schema schema2 =
294+
new Schema(
295+
2,
296+
Types.NestedField.required(1, "id", Types.LongType.get()),
297+
Types.NestedField.optional(
298+
2,
299+
"person",
300+
Types.StructType.of(
301+
Types.NestedField.optional(3, "name", Types.StringType.get()),
302+
Types.NestedField.optional(4, "age", Types.IntegerType.get()),
303+
Types.NestedField.optional(5, "email", Types.StringType.get()))));
304+
305+
Map<Integer, Types.NestedField> fields = Schema.indexFields(ImmutableList.of(schema1, schema2));
306+
307+
assertThat(fields).hasSize(5);
308+
assertThat(fields.get(1).name()).isEqualTo("id");
309+
assertThat(fields.get(2).name()).isEqualTo("person");
310+
assertThat(fields.get(3).name()).isEqualTo("name");
311+
assertThat(fields.get(4).name()).isEqualTo("age");
312+
assertThat(fields.get(5).name()).isEqualTo("email");
313+
assertThat(((Types.StructType) fields.get(2).type()).fields()).hasSize(3);
314+
}
220315
}

core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ private DeleteFileIndex planDeletesLocally(List<ManifestFile> deleteManifests) {
304304
}
305305

306306
return builder
307+
.schemasById(schemas())
307308
.specsById(specs())
308309
.filterData(filter())
309310
.caseSensitive(isCaseSensitive())

core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
8282
manifestEntry ->
8383
snapshotIds.contains(manifestEntry.snapshotId())
8484
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
85+
.schemasById(schemas())
8586
.specsById(table().specs())
8687
.ignoreDeleted()
8788
.columnsToKeepStats(columnsToKeepStats());

core/src/main/java/org/apache/iceberg/BaseScan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ protected Schema tableSchema() {
110110
return schema;
111111
}
112112

113+
protected Map<Integer, Schema> schemas() {
114+
return table.schemas();
115+
}
116+
113117
protected TableScanContext context() {
114118
return context;
115119
}

core/src/main/java/org/apache/iceberg/DataScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected ManifestGroup newManifestGroup(
5353
.caseSensitive(isCaseSensitive())
5454
.select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
5555
.filterData(filter())
56+
.schemasById(schemas())
5657
.specsById(specs())
5758
.scanMetrics(scanMetrics())
5859
.ignoreDeleted()

core/src/main/java/org/apache/iceberg/DataTableScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
7474
.caseSensitive(isCaseSensitive())
7575
.select(scanColumns())
7676
.filterData(filter())
77+
.schemasById(schemas())
7778
.specsById(specs())
7879
.scanMetrics(scanMetrics())
7980
.ignoreDeleted()

core/src/main/java/org/apache/iceberg/DeleteFileIndex.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import java.util.Set;
3333
import java.util.concurrent.ConcurrentLinkedQueue;
3434
import java.util.concurrent.ExecutorService;
35+
import java.util.function.Function;
36+
import java.util.function.Supplier;
37+
import java.util.stream.Collectors;
3538
import org.apache.iceberg.exceptions.RuntimeIOException;
3639
import org.apache.iceberg.exceptions.ValidationException;
3740
import org.apache.iceberg.expressions.Expression;
@@ -366,6 +369,7 @@ static class Builder {
366369
private final Iterable<DeleteFile> deleteFiles;
367370
private long minSequenceNumber = 0L;
368371
private Map<Integer, PartitionSpec> specsById = null;
372+
private Map<Integer, Schema> schemasById = null;
369373
private Expression dataFilter = Expressions.alwaysTrue();
370374
private Expression partitionFilter = Expressions.alwaysTrue();
371375
private PartitionSet partitionSet = null;
@@ -391,6 +395,11 @@ Builder afterSequenceNumber(long seq) {
391395
return this;
392396
}
393397

398+
Builder schemasById(Map<Integer, Schema> newSchemasById) {
399+
this.schemasById = newSchemasById;
400+
return this;
401+
}
402+
394403
Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
395404
this.specsById = newSpecsById;
396405
return this;
@@ -471,10 +480,20 @@ private Collection<DeleteFile> loadDeleteFiles() {
471480
return files;
472481
}
473482

483+
private Collection<Schema> schemas() {
484+
if (schemasById != null) {
485+
return schemasById.values();
486+
} else {
487+
return specsById.values().stream().map(PartitionSpec::schema).collect(Collectors.toList());
488+
}
489+
}
490+
474491
DeleteFileIndex build() {
492+
Map<Integer, Types.NestedField> fieldsById = Schema.indexFields(schemas());
493+
Function<Integer, Types.NestedField> fieldLookup = fieldsById::get;
475494
Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles();
476495

477-
EqualityDeletes globalDeletes = new EqualityDeletes();
496+
EqualityDeletes globalDeletes = new EqualityDeletes(fieldLookup);
478497
PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById);
479498
PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById);
480499
Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
@@ -490,7 +509,7 @@ DeleteFileIndex build() {
490509
}
491510
break;
492511
case EQUALITY_DELETES:
493-
add(globalDeletes, eqDeletesByPartition, file);
512+
add(globalDeletes, eqDeletesByPartition, file, fieldLookup);
494513
break;
495514
default:
496515
throw new UnsupportedOperationException("Unsupported content: " + file.content());
@@ -537,7 +556,8 @@ private void add(
537556
private void add(
538557
EqualityDeletes globalDeletes,
539558
PartitionMap<EqualityDeletes> deletesByPartition,
540-
DeleteFile file) {
559+
DeleteFile file,
560+
Function<Integer, Types.NestedField> fieldLookup) {
541561
PartitionSpec spec = specsById.get(file.specId());
542562

543563
EqualityDeletes deletes;
@@ -546,10 +566,11 @@ private void add(
546566
} else {
547567
int specId = spec.specId();
548568
StructLike partition = file.partition();
549-
deletes = deletesByPartition.computeIfAbsent(specId, partition, EqualityDeletes::new);
569+
Supplier<EqualityDeletes> initEqDeletes = () -> new EqualityDeletes(fieldLookup);
570+
deletes = deletesByPartition.computeIfAbsent(specId, partition, initEqDeletes);
550571
}
551572

552-
deletes.add(spec, file);
573+
deletes.add(file);
553574
}
554575

555576
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
@@ -726,16 +747,22 @@ static class EqualityDeletes {
726747
Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber);
727748
private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new EqualityDeleteFile[0];
728749

750+
private final Function<Integer, Types.NestedField> fieldLookup;
751+
729752
// indexed state
730753
private long[] seqs = null;
731754
private EqualityDeleteFile[] files = null;
732755

733756
// a buffer that is used to hold files before indexing
734757
private volatile List<EqualityDeleteFile> buffer = Lists.newArrayList();
735758

736-
public void add(PartitionSpec spec, DeleteFile file) {
759+
EqualityDeletes(Function<Integer, Types.NestedField> fieldLookup) {
760+
this.fieldLookup = fieldLookup;
761+
}
762+
763+
public void add(DeleteFile file) {
737764
Preconditions.checkState(buffer != null, "Can't add files upon indexing");
738-
buffer.add(new EqualityDeleteFile(spec, file));
765+
buffer.add(new EqualityDeleteFile(fieldLookup, file));
739766
}
740767

741768
public DeleteFile[] filter(long seq, DataFile dataFile) {
@@ -801,15 +828,15 @@ private static long[] indexSeqs(EqualityDeleteFile[] files) {
801828
// an equality delete file wrapper that caches the converted boundaries for faster boundary checks
802829
// this class is not meant to be exposed beyond the delete file index
803830
private static class EqualityDeleteFile {
804-
private final PartitionSpec spec;
831+
private final Function<Integer, Types.NestedField> fieldLookup;
805832
private final DeleteFile wrapped;
806833
private final long applySequenceNumber;
807834
private volatile List<Types.NestedField> equalityFields = null;
808835
private volatile Map<Integer, Object> convertedLowerBounds = null;
809836
private volatile Map<Integer, Object> convertedUpperBounds = null;
810837

811-
EqualityDeleteFile(PartitionSpec spec, DeleteFile file) {
812-
this.spec = spec;
838+
EqualityDeleteFile(Function<Integer, Types.NestedField> fieldLookup, DeleteFile file) {
839+
this.fieldLookup = fieldLookup;
813840
this.wrapped = file;
814841
this.applySequenceNumber = wrapped.dataSequenceNumber() - 1;
815842
}
@@ -828,7 +855,8 @@ public List<Types.NestedField> equalityFields() {
828855
if (equalityFields == null) {
829856
List<Types.NestedField> fields = Lists.newArrayList();
830857
for (int id : wrapped.equalityFieldIds()) {
831-
Types.NestedField field = spec.schema().findField(id);
858+
Types.NestedField field = fieldLookup.apply(id);
859+
Preconditions.checkArgument(field != null, "Cannot find field for ID %s", id);
832860
fields.add(field);
833861
}
834862
this.equalityFields = fields;
@@ -891,7 +919,7 @@ private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer> bounds) {
891919
if (bounds != null) {
892920
for (Types.NestedField field : equalityFields()) {
893921
int id = field.fieldId();
894-
Type type = spec.schema().findField(id).type();
922+
Type type = field.type();
895923
if (type.isPrimitiveType()) {
896924
ByteBuffer bound = bounds.get(id);
897925
if (bound != null) {

core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public CloseableIterable<FileScanTask> planFiles() {
101101
manifestEntry ->
102102
snapshotIds.contains(manifestEntry.snapshotId())
103103
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
104+
.schemasById(schemas())
104105
.specsById(table().specs())
105106
.ignoreDeleted()
106107
.columnsToKeepStats(columnsToKeepStats());

core/src/main/java/org/apache/iceberg/ManifestGroup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ class ManifestGroup {
8989
this.scanMetrics = ScanMetrics.noop();
9090
}
9191

92+
ManifestGroup schemasById(Map<Integer, Schema> newSchemasById) {
93+
deleteIndexBuilder.schemasById(newSchemasById);
94+
return this;
95+
}
96+
9297
ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
9398
this.specsById = newSpecsById;
9499
deleteIndexBuilder.specsById(newSpecsById);

0 commit comments

Comments
 (0)