From d2ae19a2de9b82c23f5fee69b97e8f7099ce9f43 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Tue, 7 Apr 2026 03:08:49 +0100 Subject: [PATCH 1/6] Replace transactions rebase onto refreshed metadata --- .../org/apache/iceberg/BaseTransaction.java | 91 ++++++++++++------- .../iceberg/rest/RESTTableOperations.java | 5 +- .../apache/iceberg/catalog/CatalogTests.java | 62 ++++++++----- 3 files changed, 98 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..a4268fd84d43 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -72,6 +72,7 @@ enum TransactionType { Sets.newHashSet(); // keep track of files deleted in the most recent commit private final Consumer enqueueDelete = deletedFiles::add; private final TransactionType type; + private final TableMetadata start; private TableMetadata base; private TableMetadata current; private boolean hasLastOpCommitted; @@ -91,6 +92,7 @@ enum TransactionType { this.tableName = tableName; this.ops = ops; this.transactionTable = new TransactionTable(); + this.start = start; this.current = start; this.transactionOps = new TransactionTableOperations(); this.updates = Lists.newArrayList(); @@ -258,11 +260,8 @@ public void commitTransaction() { break; case REPLACE_TABLE: - commitReplaceTransaction(false); - break; - case CREATE_OR_REPLACE_TABLE: - commitReplaceTransaction(true); + commitReplaceTransaction(); break; case SIMPLE: @@ -295,9 +294,14 @@ private void commitCreateTransaction() { } } - private void commitReplaceTransaction(boolean orCreate) { + private void commitReplaceTransaction() { Map props = base != null ? base.properties() : current.properties(); + Set startingSnapshots = + base != null + ? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()) + : ImmutableSet.of(); + try { Tasks.foreach(ops) .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) @@ -312,40 +316,26 @@ private void commitReplaceTransaction(boolean orCreate) { .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { - try { - underlyingOps.refresh(); - } catch (NoSuchTableException e) { - if (!orCreate) { - throw e; - } - } - - // because this is a replace table, it will always completely replace the table - // metadata. even if it was just updated. - if (base != underlyingOps.current()) { - this.base = underlyingOps.current(); // just refreshed - } - + applyUpdates(underlyingOps); underlyingOps.commit(base, current); }); } catch (CommitStateUnknownException e) { throw e; + } catch (PendingUpdateFailedException e) { + cleanUp(); + throw e.wrapped(); + } catch (RuntimeException e) { - // the commit failed and no files were committed. clean up each update. if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { - cleanAllUpdates(); + cleanUp(); } throw e; - - } finally { - // replace table never needs to retry because the table state is completely replaced. because - // retries are not - // a concern, it is safe to delete all the deleted files from individual operations - deleteUncommittedFiles(deletedFiles); } + + cleanUpAfterCommitSuccess(startingSnapshots); } private void commitSimpleTransaction() { @@ -378,6 +368,7 @@ private void commitSimpleTransaction() { } catch (PendingUpdateFailedException e) { cleanUp(); throw e.wrapped(); + } catch (RuntimeException e) { if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { cleanUp(); @@ -386,8 +377,10 @@ private void commitSimpleTransaction() { throw e; } - // the commit succeeded + cleanUpAfterCommitSuccess(startingSnapshots); + } + private void cleanUpAfterCommitSuccess(Set startingSnapshots) { try { // clean up the data files that were deleted by each operation. first, get the list of // committed manifests to ensure that no committed manifest is deleted. @@ -441,12 +434,28 @@ private void deleteUncommittedFiles(Iterable paths) { } private void applyUpdates(TableOperations underlyingOps) { - if (base != underlyingOps.refresh()) { - // use refreshed the metadata + try { + underlyingOps.refresh(); + } catch (NoSuchTableException e) { + if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + return; + } + throw e; + } + + if (underlyingOps.current() == null) { + if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + this.base = null; + return; + } + throw new NoSuchTableException("Table metadata is not available after refresh"); + } + + if (base != underlyingOps.current()) { this.base = underlyingOps.current(); - this.current = underlyingOps.current(); + this.current = startingMetadataFor(underlyingOps.current()); + for (PendingUpdate update : updates) { - // re-commit each update in the chain to apply it and update current try { update.commit(); } catch (CommitFailedException e) { @@ -458,6 +467,24 @@ private void applyUpdates(TableOperations underlyingOps) { } } + private TableMetadata startingMetadataFor(TableMetadata refreshed) { + return switch (type) { + case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> + // TODO: Reconsider implementation here, relies on "start" being produced via replacement, + // and its immutability (not guaranteed - its the "properties()" hash map is exposed). + // Even if we are dealing with a replace-style transaction, we need to re-apply + // updates on top of the refreshed metadata's replacement + refreshed.buildReplacement( + start.schema(), + start.spec(), + start.sortOrder(), + start.location(), + start.properties()); + case SIMPLE -> refreshed; + default -> throw new IllegalStateException("Unexpected transaction type for update: " + type); + }; + } + // committedFiles returns null whenever the set of committed files // cannot be determined from the provided snapshots private static Set committedFiles(TableOperations ops, Set snapshotIds) { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index be763d30fef1..40e4b6c3da06 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -59,7 +59,6 @@ enum UpdateType { private final Supplier> mutationHeaders; private final FileIO io; private final List createChanges; - private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; @@ -132,7 +131,6 @@ enum UpdateType { this.io = io; this.updateType = updateType; this.createChanges = createChanges; - this.replaceBase = current; if (updateType == UpdateType.CREATE) { this.current = null; } else { @@ -179,8 +177,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { .addAll(createChanges) .addAll(metadata.changes()) .build(); - // use the original replace base metadata because the transaction will refresh - requirements = UpdateRequirements.forReplaceTable(replaceBase, updates); + requirements = UpdateRequirements.forReplaceTable(base, updates); errorHandler = ErrorHandlers.tableCommitHandler(); break; diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 833b2fb0b46f..8b32973d9653 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2655,15 +2655,20 @@ public void testReplaceTableKeepsSnapshotLog() { .containsExactly(snapshotBeforeReplace, snapshotAfterReplace); } - @Test - public void testConcurrentReplaceTransactions() { + @ParameterizedTest + @ValueSource(ints = {2, 3}) + public void testConcurrentReplaceTransactions(int formatVersion) { C catalog = catalog(); if (requiresNamespaceCreate()) { catalog.createNamespace(NS); } - Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + Transaction transaction = + catalog + .buildTable(TABLE, SCHEMA) + .withProperty("format-version", String.valueOf(formatVersion)) + .createTransaction(); transaction.newFastAppend().appendFile(FILE_A).commit(); transaction.commitTransaction(); @@ -2691,6 +2696,8 @@ public void testConcurrentReplaceTransactions() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); + // All three successfully committed snapshots should be present + assertThat(afterSecondReplace.snapshots()).hasSize(3); assertThat(afterSecondReplace.schema().asStruct()) .as("Table schema should match the original schema") .isEqualTo(original.schema().asStruct()); @@ -2734,9 +2741,14 @@ public void testConcurrentReplaceTransactionSchema() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); - assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should match the original schema") - .isEqualTo(original.schema().asStruct()); + // The second replace is rebased on the first replace, so new field IDs will be assigned + assertThat(afterSecondReplace.schema().columns()) + .as("Table schema should have the same column names as the original") + .extracting("name") + .containsExactlyElementsOf( + original.schema().columns().stream() + .map(Types.NestedField::name) + .collect(java.util.stream.Collectors.toList())); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @@ -2773,17 +2785,19 @@ public void testConcurrentReplaceTransactionSchema2() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); + // The second replace is rebased on the first replace, so new field IDs will be assigned assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should match the new schema") - .isEqualTo(REPLACE_SCHEMA.asStruct()); + .as("Table schema should differ from the original schema") + .isNotEqualTo(original.schema().asStruct()); + assertThat(afterSecondReplace.schema().select("name", "type").asStruct()) + .as("Table schema should match the original schema in names and types") + .isEqualTo(original.schema().select("name", "type").asStruct()); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @Test public void testConcurrentReplaceTransactionSchemaConflict() { - assumeThat(supportsServerSideRetry()).as("Schema conflicts are detected server-side").isTrue(); - C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2806,18 +2820,19 @@ public void testConcurrentReplaceTransactionSchemaConflict() { Table afterFirstReplace = catalog.loadTable(TABLE); assertThat(afterFirstReplace.schema().asStruct()) - .as("Table schema should match the original schema") + .as("Table schema should match the new schema") .isEqualTo(REPLACE_SCHEMA.asStruct()); assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new schema is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned field id changed"); + // With rebase, the second replace succeeds by re-applying buildReplacement on refreshed + // metadata + secondReplace.commitTransaction(); + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema should match the new schema") + .isEqualTo(REPLACE_SCHEMA.asStruct()); } @Test @@ -2902,7 +2917,6 @@ public void testConcurrentReplaceTransactionPartitionSpec2() { @Test public void testConcurrentReplaceTransactionPartitionSpecConflict() { - assumeThat(supportsServerSideRetry()).as("Spec conflicts are detected server-side").isTrue(); C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2932,12 +2946,12 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new spec is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned partition id changed"); + // With rebase, the second replace succeeds by re-applying buildReplacement on refreshed + // metadata + secondReplace.commitTransaction(); + assertThat(catalog.loadTable(TABLE).spec().fields()) + .as("Table spec should match the new spec") + .isEqualTo(TABLE_SPEC.fields()); } @Test From 0fcacb75f85d6940a473aec333c69fc28e957ef6 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Tue, 7 Apr 2026 03:49:13 +0100 Subject: [PATCH 2/6] Fixes --- .../main/java/org/apache/iceberg/BaseTransaction.java | 5 +---- .../java/org/apache/iceberg/catalog/CatalogTests.java | 11 ++++++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index a4268fd84d43..b9535bf5123a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -438,6 +438,7 @@ private void applyUpdates(TableOperations underlyingOps) { underlyingOps.refresh(); } catch (NoSuchTableException e) { if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + this.base = null; return; } throw e; @@ -470,10 +471,6 @@ private void applyUpdates(TableOperations underlyingOps) { private TableMetadata startingMetadataFor(TableMetadata refreshed) { return switch (type) { case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> - // TODO: Reconsider implementation here, relies on "start" being produced via replacement, - // and its immutability (not guaranteed - its the "properties()" hash map is exposed). - // Even if we are dealing with a replace-style transaction, we need to re-apply - // updates on top of the refreshed metadata's replacement refreshed.buildReplacement( start.schema(), start.spec(), diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 8b32973d9653..098293da7bbf 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2748,7 +2748,7 @@ public void testConcurrentReplaceTransactionSchema() { .containsExactlyElementsOf( original.schema().columns().stream() .map(Types.NestedField::name) - .collect(java.util.stream.Collectors.toList())); + .collect(Collectors.toList())); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @@ -2787,11 +2787,12 @@ public void testConcurrentReplaceTransactionSchema2() { Table afterSecondReplace = catalog.loadTable(TABLE); // The second replace is rebased on the first replace, so new field IDs will be assigned assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should differ from the original schema") + .as("Table schema should differ from the original schema due to rebased field IDs") .isNotEqualTo(original.schema().asStruct()); - assertThat(afterSecondReplace.schema().select("name", "type").asStruct()) - .as("Table schema should match the original schema in names and types") - .isEqualTo(original.schema().select("name", "type").asStruct()); + assertThat(afterSecondReplace.schema().columns()) + .as("Table schema should have the same column names as the replacement schema") + .extracting("name") + .containsExactly("id", "data"); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } From 8e54050d627e32c801acc59e81903847c9d18cd4 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Sun, 19 Apr 2026 00:41:21 -0700 Subject: [PATCH 3/6] Fixes Co-Authored-By: Claude Opus 4.6 (1M context) --- .../org/apache/iceberg/BaseTransaction.java | 2 +- .../org/apache/iceberg/TableMetadata.java | 50 ++++ .../apache/iceberg/catalog/CatalogTests.java | 69 +++-- ...stReplaceTransactionFieldIdCorruption.java | 242 ++++++++++++++++++ .../hive/TestHiveCreateReplaceTable.java | 8 +- 5 files changed, 351 insertions(+), 20 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index b9535bf5123a..459bff658dd5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -471,7 +471,7 @@ private void applyUpdates(TableOperations underlyingOps) { private TableMetadata startingMetadataFor(TableMetadata refreshed) { return switch (type) { case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> - refreshed.buildReplacement( + refreshed.buildReplacementPreservingIds( start.schema(), start.spec(), start.sortOrder(), diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 43a67dd2bef2..ca89f54c2edd 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -745,6 +745,56 @@ public TableMetadata buildReplacement( .build(); } + /** + * Builds replacement metadata preserving the caller-provided schema's field IDs. + * + *

Unlike {@link #buildReplacement}, which re-derives field IDs by name-matching against the + * current table schema, this method uses the provided schema's field IDs directly. This is used + * during transaction rebase to avoid reassigning field IDs that data files already reference. + * + *

The {@code lastColumnId} is set to the maximum of the provided schema's highest field ID and + * this metadata's {@code lastColumnId}, preventing future ID collisions. + */ + public TableMetadata buildReplacementPreservingIds( + Schema schemaWithIds, + PartitionSpec updatedPartitionSpec, + SortOrder updatedSortOrder, + String newLocation, + Map updatedProperties) { + ValidationException.check( + formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", + updatedPartitionSpec); + + int preservedLastColumnId = Math.max(schemaWithIds.highestFieldId(), lastColumnId); + + // rebuild the partition spec using the preserved column ids and reassign partition field ids + // to align with existing partition specs in the table + PartitionSpec freshSpec = + reassignPartitionIds( + freshSpec(INITIAL_SPEC_ID, schemaWithIds, updatedPartitionSpec), + new AtomicInteger(lastAssignedPartitionId)::incrementAndGet); + + // rebuild the sort order using preserved column ids + SortOrder freshSortOrder = + freshSortOrder(INITIAL_SORT_ORDER_ID, schemaWithIds, updatedSortOrder); + + // check if there is format version override + int newFormatVersion = + PropertyUtil.propertyAsInt( + updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); + + return new Builder(this) + .upgradeFormatVersion(newFormatVersion) + .removeRef(SnapshotRef.MAIN_BRANCH) + .setCurrentSchema(schemaWithIds, preservedLastColumnId) + .setDefaultPartitionSpec(freshSpec) + .setDefaultSortOrder(freshSortOrder) + .setLocation(newLocation) + .setProperties(persistedProperties(updatedProperties)) + .build(); + } + public TableMetadata updateLocation(String newLocation) { return new Builder(this).setLocation(newLocation).build(); } diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 098293da7bbf..f4f4ddde1194 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2741,14 +2741,10 @@ public void testConcurrentReplaceTransactionSchema() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); - // The second replace is rebased on the first replace, so new field IDs will be assigned - assertThat(afterSecondReplace.schema().columns()) - .as("Table schema should have the same column names as the original") - .extracting("name") - .containsExactlyElementsOf( - original.schema().columns().stream() - .map(Types.NestedField::name) - .collect(Collectors.toList())); + // Field IDs from the second replace's start schema are preserved during rebase + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema struct should match the original — field IDs are preserved on rebase") + .isEqualTo(original.schema().asStruct()); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @@ -2785,14 +2781,10 @@ public void testConcurrentReplaceTransactionSchema2() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); - // The second replace is rebased on the first replace, so new field IDs will be assigned + // Field IDs from the second replace's start schema are preserved during rebase assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should differ from the original schema due to rebased field IDs") - .isNotEqualTo(original.schema().asStruct()); - assertThat(afterSecondReplace.schema().columns()) - .as("Table schema should have the same column names as the replacement schema") - .extracting("name") - .containsExactly("id", "data"); + .as("Table schema should match the replacement schema — field IDs are preserved on rebase") + .isEqualTo(REPLACE_SCHEMA.asStruct()); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @@ -3036,6 +3028,53 @@ public void testConcurrentReplaceTransactionSortOrderConflict() { assertFiles(afterSecondReplace, FILE_C); } + @Test + public void testConcurrentReplaceTransactionFieldIdsPreserved() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + // Create a table with OTHER_SCHEMA = {1: some_id} + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Table original = catalog.loadTable(TABLE); + assertThat(original.schema().columns()).hasSize(1); + assertThat(original.schema().findField("some_id").fieldId()).isEqualTo(1); + + // Start a replace with SCHEMA = {id, data} — this gets fresh IDs {2: id, 3: data} + Transaction replace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + replace.newFastAppend().appendFile(FILE_A).commit(); + + // Capture the field IDs that the replace transaction's schema was assigned + Table replaceView = replace.table(); + int idFieldId = replaceView.schema().findField("id").fieldId(); + int dataFieldId = replaceView.schema().findField("data").fieldId(); + + // Concurrently, another replace changes the schema to something different + Transaction concurrentReplace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + concurrentReplace.commitTransaction(); + + // The concurrent replace changed the table schema and lastColumnId. + // Now commit the first replace — it must rebase and preserve field IDs. + replace.commitTransaction(); + + Table afterReplace = catalog.loadTable(TABLE); + + // Field IDs must match what the replace transaction originally assigned, because + // data files appended during the transaction reference those IDs in their Parquet metadata. + assertThat(afterReplace.schema().findField("id").fieldId()) + .as("id field ID must be preserved after rebase — data files reference this ID") + .isEqualTo(idFieldId); + assertThat(afterReplace.schema().findField("data").fieldId()) + .as("data field ID must be preserved after rebase — data files reference this ID") + .isEqualTo(dataFieldId); + + assertUUIDsMatch(original, afterReplace); + assertFiles(afterReplace, FILE_A); + } + @ParameterizedTest @ValueSource(ints = {1, 2, 3}) public void createTableTransaction(int formatVersion) { diff --git a/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java b/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java new file mode 100644 index 000000000000..e4a2c88d1099 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests that verify field IDs are preserved during replace transaction rebase with concurrent + * schema-changing replaces. Uses {@code buildReplacementPreservingIds} to ensure data files written + * during the transaction remain readable after rebase, even when the concurrent replace uses a + * completely different schema. + */ +public class TestReplaceTransactionFieldIdCorruption { + + private static final Configuration CONF = new Configuration(); + private static final Namespace NS = Namespace.of("db"); + + @TempDir private File tempDir; + + private HadoopCatalog catalog; + + @BeforeEach + public void before() { + catalog = new HadoopCatalog(); + catalog.setConf(CONF); + catalog.initialize( + "test-catalog", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, tempDir.getAbsolutePath())); + } + + /** + * Verifies that required fields remain readable after rebase. With buildReplacementPreservingIds, + * the field IDs from the original replace transaction are preserved, so the Parquet file's field + * IDs still match the table schema. + */ + @Test + public void testRequiredFieldReadableAfterRebase() throws IOException { + TableIdentifier tableId = TableIdentifier.of(NS, "required_field_test"); + + Schema originalSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + + Schema differentSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "category", Types.StringType.get())); + + catalog.createTable(tableId, originalSchema); + + Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); + Table txnTable = txnA.table(); + DataFile dataFile = writeDataFile(txnTable, originalSchema, 42, "hello"); + txnA.newFastAppend().appendFile(dataFile).commit(); + + // Concurrently replace with a different schema + Transaction concurrentReplace = + catalog.buildTable(tableId, differentSchema).replaceTransaction(); + concurrentReplace.commitTransaction(); + + // Commit the first replace — rebase preserves field IDs + txnA.commitTransaction(); + + Table finalTable = catalog.loadTable(tableId); + Types.NestedField dataField = finalTable.schema().findField("data"); + assertThat(dataField).as("data field should exist in final schema").isNotNull(); + + // Field ID must be preserved — the rebase uses buildReplacementPreservingIds + assertThat(dataField.fieldId()) + .as("data field should retain its original ID after rebase") + .isEqualTo(2); + + // Data should be fully readable since field IDs match the Parquet file + List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); + assertThat(records).hasSize(1); + assertThat(records.get(0).getField("id")).isEqualTo(42); + assertThat(records.get(0).getField("data")).isEqualTo("hello"); + } + + /** + * Verifies that optional fields are readable (not null) after rebase with a concurrent + * schema-changing replace. + */ + @Test + public void testOptionalFieldReadableAfterRebase() throws IOException { + TableIdentifier tableId = TableIdentifier.of(NS, "optional_field_test"); + + Schema originalSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get())); + + Schema differentSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + optional(2, "category", Types.StringType.get())); + + catalog.createTable(tableId, originalSchema); + + Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); + Table txnTable = txnA.table(); + DataFile dataFile = writeDataFile(txnTable, originalSchema, 42, "hello"); + txnA.newFastAppend().appendFile(dataFile).commit(); + + Transaction concurrentReplace = + catalog.buildTable(tableId, differentSchema).replaceTransaction(); + concurrentReplace.commitTransaction(); + + txnA.commitTransaction(); + + Table finalTable = catalog.loadTable(tableId); + Types.NestedField dataField = finalTable.schema().findField("data"); + assertThat(dataField).as("data field should exist in final schema").isNotNull(); + assertThat(dataField.fieldId()) + .as("data field should retain its original ID after rebase") + .isEqualTo(2); + + // Data should be readable — not null + List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); + assertThat(records).hasSize(1); + assertThat(records.get(0).getField("id")).isEqualTo(42); + assertThat(records.get(0).getField("data")) + .as("data should be 'hello', not null — field IDs are preserved") + .isEqualTo("hello"); + } + + /** + * Verifies that data remains readable after rebase when the concurrent replace uses a schema with + * completely different column names at the same positions. + */ + @Test + public void testDataReadableAfterConcurrentColumnRename() throws IOException { + TableIdentifier tableId = TableIdentifier.of(NS, "column_addition_test"); + + Schema originalSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get())); + + Schema schemaWithExtraColumn = + new Schema( + required(1, "id", Types.IntegerType.get()), + optional(2, "extra_col", Types.LongType.get())); + + catalog.createTable(tableId, originalSchema); + + Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); + Table txnTable = txnA.table(); + DataFile dataFile = writeDataFile(txnTable, originalSchema, 99, "world"); + txnA.newFastAppend().appendFile(dataFile).commit(); + + Transaction concurrentReplace = + catalog.buildTable(tableId, schemaWithExtraColumn).replaceTransaction(); + concurrentReplace.commitTransaction(); + + txnA.commitTransaction(); + + Table finalTable = catalog.loadTable(tableId); + Types.NestedField dataField = finalTable.schema().findField("data"); + assertThat(dataField).as("data field should exist").isNotNull(); + assertThat(dataField.fieldId()) + .as("data field should retain its original ID after rebase") + .isEqualTo(2); + + // Data should be readable + List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); + assertThat(records).hasSize(1); + assertThat(records.get(0).getField("id")).isEqualTo(99); + assertThat(records.get(0).getField("data")) + .as("data should be 'world', not null — field IDs are preserved") + .isEqualTo("world"); + } + + private DataFile writeDataFile(Table table, Schema writeSchema, int id, String data) + throws IOException { + Record record = GenericRecord.create(writeSchema); + record.setField("id", id); + record.setField("data", data); + + String filename = FileFormat.PARQUET.addExtension("test-" + UUID.randomUUID()); + File dataDir = new File(tempDir, "data"); + dataDir.mkdirs(); + File file = new File(dataDir, filename); + + GenericFileWriterFactory writerFactory = + new GenericFileWriterFactory.Builder(table).dataFileFormat(FileFormat.PARQUET).build(); + + DataWriter writer = + writerFactory.newDataWriter( + FileHelpers.encrypt(org.apache.iceberg.Files.localOutput(file)), + PartitionSpec.unpartitioned(), + null); + + try (writer) { + writer.write(record); + } + + return writer.toDataFile(); + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java index c3f66c0286d0..a7b7de74e7cd 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java @@ -239,7 +239,7 @@ public void testReplaceTableTxnTableModifiedConcurrently() { table = catalog.loadTable(TABLE_IDENTIFIER); assertThat(table.properties()) .as("Table props should be updated") - .doesNotContainKey("another-prop") + .containsEntry("another-prop", "another-value") .containsEntry("prop", "value"); } @@ -329,9 +329,9 @@ public void testCreateOrReplaceTableTxnTableCreatedConcurrently() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - assertThat(table.spec()) - .as("Partition spec should match") - .isEqualTo(PartitionSpec.unpartitioned()); + assertThat(table.spec().isUnpartitioned()) + .as("Partition spec should be unpartitioned") + .isTrue(); assertThat(table.properties()).as("Table props should match").containsEntry("prop", "value"); } From 009f2a78e147783b6d1f7d6e24759ab19e72aea9 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 6 May 2026 00:16:51 +0100 Subject: [PATCH 4/6] Nits --- .../org/apache/iceberg/BaseTransaction.java | 40 ++++++++++++++----- .../apache/iceberg/catalog/CatalogTests.java | 40 +++++++++++++++++-- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 459bff658dd5..8cb4664c787a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -48,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -453,8 +454,15 @@ private void applyUpdates(TableOperations underlyingOps) { } if (base != underlyingOps.current()) { - this.base = underlyingOps.current(); - this.current = startingMetadataFor(underlyingOps.current()); + TableMetadata refreshed = underlyingOps.current(); + + try { + this.current = startingMetadataFor(refreshed); + } catch (CommitFailedException e) { + throw new PendingUpdateFailedException(e); + } + + this.base = refreshed; for (PendingUpdate update : updates) { try { @@ -470,18 +478,32 @@ private void applyUpdates(TableOperations underlyingOps) { private TableMetadata startingMetadataFor(TableMetadata refreshed) { return switch (type) { - case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> - refreshed.buildReplacementPreservingIds( - start.schema(), - start.spec(), - start.sortOrder(), - start.location(), - start.properties()); + case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> { + validateFieldIds(start.schema(), refreshed.schema()); + yield refreshed.buildReplacementPreservingIds( + start.schema(), start.spec(), start.sortOrder(), start.location(), start.properties()); + } case SIMPLE -> refreshed; default -> throw new IllegalStateException("Unexpected transaction type for update: " + type); }; } + /** + * Validates that field IDs in the start schema don't conflict with the refreshed schema. A + * conflict exists when both schemas use the same field ID for columns with different names. + */ + private static void validateFieldIds(Schema startSchema, Schema refreshedSchema) { + for (Types.NestedField startField : startSchema.columns()) { + Types.NestedField refreshedField = refreshedSchema.findField(startField.fieldId()); + if (refreshedField != null && !refreshedField.name().equals(startField.name())) { + throw new CommitFailedException( + "Cannot commit replace transaction: field ID %d is '%s' in the replace schema " + + "but '%s' in the current table schema", + startField.fieldId(), startField.name(), refreshedField.name()); + } + } + } + // committedFiles returns null whenever the set of committed files // cannot be determined from the provided snapshots private static Set committedFiles(TableOperations ops, Set snapshotIds) { diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index f4f4ddde1194..dcd339798163 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2741,9 +2741,8 @@ public void testConcurrentReplaceTransactionSchema() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); - // Field IDs from the second replace's start schema are preserved during rebase assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema struct should match the original — field IDs are preserved on rebase") + .as("Table schema should match the original schema") .isEqualTo(original.schema().asStruct()); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); @@ -2781,9 +2780,8 @@ public void testConcurrentReplaceTransactionSchema2() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); - // Field IDs from the second replace's start schema are preserved during rebase assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should match the replacement schema — field IDs are preserved on rebase") + .as("Table schema should match the new schema") .isEqualTo(REPLACE_SCHEMA.asStruct()); assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); @@ -3028,6 +3026,40 @@ public void testConcurrentReplaceTransactionSortOrderConflict() { assertFiles(afterSecondReplace, FILE_C); } + @Test + public void testConcurrentReplaceTransactionFieldIdConflict() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + // Two schemas with completely different column names. Both buildReplacement calls assign + // the same fresh IDs (starting from lastColumnId=1) to different columns. + Schema schemaA = + new Schema( + required(10, "col_a1", Types.IntegerType.get()), + required(11, "col_a2", Types.StringType.get())); + Schema schemaB = + new Schema( + required(20, "col_b1", Types.IntegerType.get()), + required(21, "col_b2", Types.StringType.get())); + + Transaction replaceA = catalog.buildTable(TABLE, schemaA).replaceTransaction(); + replaceA.newFastAppend().appendFile(FILE_A).commit(); + + Transaction replaceB = catalog.buildTable(TABLE, schemaB).replaceTransaction(); + replaceB.newFastAppend().appendFile(FILE_B).commit(); + replaceB.commitTransaction(); + + // A's field IDs conflict with B's — same IDs, different columns + assertThatThrownBy(replaceA::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Cannot commit replace transaction"); + } + @Test public void testConcurrentReplaceTransactionFieldIdsPreserved() { C catalog = catalog(); From a02e8ef1b7f98f88e63a3d53e1410ee237598ecd Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 6 May 2026 00:27:55 +0100 Subject: [PATCH 5/6] Improvements --- .../org/apache/iceberg/BaseTransaction.java | 53 +--- .../org/apache/iceberg/TableMetadata.java | 3 +- .../apache/iceberg/catalog/CatalogTests.java | 12 +- ...stReplaceTransactionFieldIdCorruption.java | 242 ------------------ 4 files changed, 13 insertions(+), 297 deletions(-) delete mode 100644 data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 8cb4664c787a..3184b92822a7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -262,11 +262,8 @@ public void commitTransaction() { case REPLACE_TABLE: case CREATE_OR_REPLACE_TABLE: - commitReplaceTransaction(); - break; - case SIMPLE: - commitSimpleTransaction(); + commitWithRetry(); break; } } @@ -295,7 +292,11 @@ private void commitCreateTransaction() { } } - private void commitReplaceTransaction() { + private void commitWithRetry() { + if (base == current) { + return; + } + Map props = base != null ? base.properties() : current.properties(); Set startingSnapshots = @@ -339,48 +340,6 @@ private void commitReplaceTransaction() { cleanUpAfterCommitSuccess(startingSnapshots); } - private void commitSimpleTransaction() { - // if there were no changes, don't try to commit - if (base == current) { - return; - } - - Set startingSnapshots = - base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); - try { - Tasks.foreach(ops) - .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .onlyRetryOn(CommitFailedException.class) - .run( - underlyingOps -> { - applyUpdates(underlyingOps); - - underlyingOps.commit(base, current); - }); - - } catch (CommitStateUnknownException e) { - throw e; - - } catch (PendingUpdateFailedException e) { - cleanUp(); - throw e.wrapped(); - - } catch (RuntimeException e) { - if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { - cleanUp(); - } - - throw e; - } - - cleanUpAfterCommitSuccess(startingSnapshots); - } - private void cleanUpAfterCommitSuccess(Set startingSnapshots) { try { // clean up the data files that were deleted by each operation. first, get the list of diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index ca89f54c2edd..f75a4e3070fc 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -750,7 +750,8 @@ public TableMetadata buildReplacement( * *

Unlike {@link #buildReplacement}, which re-derives field IDs by name-matching against the * current table schema, this method uses the provided schema's field IDs directly. This is used - * during transaction rebase to avoid reassigning field IDs that data files already reference. + * when refreshing replace transactions to avoid reassigning field IDs that data files already + * reference. * *

The {@code lastColumnId} is set to the maximum of the provided schema's highest field ID and * this metadata's {@code lastColumnId}, preventing future ID collisions. diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index dcd339798163..b68d525d7074 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2817,8 +2817,7 @@ public void testConcurrentReplaceTransactionSchemaConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // With rebase, the second replace succeeds by re-applying buildReplacement on refreshed - // metadata + // Both replaces use the same schema, so field IDs match after refresh — succeeds secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); assertThat(afterSecondReplace.schema().asStruct()) @@ -2937,8 +2936,7 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // With rebase, the second replace succeeds by re-applying buildReplacement on refreshed - // metadata + // Both replaces use the same spec, so the refresh succeeds secondReplace.commitTransaction(); assertThat(catalog.loadTable(TABLE).spec().fields()) .as("Table spec should match the new spec") @@ -3089,7 +3087,7 @@ public void testConcurrentReplaceTransactionFieldIdsPreserved() { concurrentReplace.commitTransaction(); // The concurrent replace changed the table schema and lastColumnId. - // Now commit the first replace — it must rebase and preserve field IDs. + // Concurrent replace with the same schema — field IDs will match after refresh replace.commitTransaction(); Table afterReplace = catalog.loadTable(TABLE); @@ -3097,10 +3095,10 @@ public void testConcurrentReplaceTransactionFieldIdsPreserved() { // Field IDs must match what the replace transaction originally assigned, because // data files appended during the transaction reference those IDs in their Parquet metadata. assertThat(afterReplace.schema().findField("id").fieldId()) - .as("id field ID must be preserved after rebase — data files reference this ID") + .as("id field ID must be preserved after refresh") .isEqualTo(idFieldId); assertThat(afterReplace.schema().findField("data").fieldId()) - .as("data field ID must be preserved after rebase — data files reference this ID") + .as("data field ID must be preserved after refresh") .isEqualTo(dataFieldId); assertUUIDsMatch(original, afterReplace); diff --git a/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java b/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java deleted file mode 100644 index e4a2c88d1099..000000000000 --- a/data/src/test/java/org/apache/iceberg/data/TestReplaceTransactionFieldIdCorruption.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.data; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.UUID; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.Transaction; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -/** - * Tests that verify field IDs are preserved during replace transaction rebase with concurrent - * schema-changing replaces. Uses {@code buildReplacementPreservingIds} to ensure data files written - * during the transaction remain readable after rebase, even when the concurrent replace uses a - * completely different schema. - */ -public class TestReplaceTransactionFieldIdCorruption { - - private static final Configuration CONF = new Configuration(); - private static final Namespace NS = Namespace.of("db"); - - @TempDir private File tempDir; - - private HadoopCatalog catalog; - - @BeforeEach - public void before() { - catalog = new HadoopCatalog(); - catalog.setConf(CONF); - catalog.initialize( - "test-catalog", - ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, tempDir.getAbsolutePath())); - } - - /** - * Verifies that required fields remain readable after rebase. With buildReplacementPreservingIds, - * the field IDs from the original replace transaction are preserved, so the Parquet file's field - * IDs still match the table schema. - */ - @Test - public void testRequiredFieldReadableAfterRebase() throws IOException { - TableIdentifier tableId = TableIdentifier.of(NS, "required_field_test"); - - Schema originalSchema = - new Schema( - required(1, "id", Types.IntegerType.get()), - required(2, "data", Types.StringType.get())); - - Schema differentSchema = - new Schema( - required(1, "id", Types.IntegerType.get()), - required(2, "category", Types.StringType.get())); - - catalog.createTable(tableId, originalSchema); - - Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); - Table txnTable = txnA.table(); - DataFile dataFile = writeDataFile(txnTable, originalSchema, 42, "hello"); - txnA.newFastAppend().appendFile(dataFile).commit(); - - // Concurrently replace with a different schema - Transaction concurrentReplace = - catalog.buildTable(tableId, differentSchema).replaceTransaction(); - concurrentReplace.commitTransaction(); - - // Commit the first replace — rebase preserves field IDs - txnA.commitTransaction(); - - Table finalTable = catalog.loadTable(tableId); - Types.NestedField dataField = finalTable.schema().findField("data"); - assertThat(dataField).as("data field should exist in final schema").isNotNull(); - - // Field ID must be preserved — the rebase uses buildReplacementPreservingIds - assertThat(dataField.fieldId()) - .as("data field should retain its original ID after rebase") - .isEqualTo(2); - - // Data should be fully readable since field IDs match the Parquet file - List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); - assertThat(records).hasSize(1); - assertThat(records.get(0).getField("id")).isEqualTo(42); - assertThat(records.get(0).getField("data")).isEqualTo("hello"); - } - - /** - * Verifies that optional fields are readable (not null) after rebase with a concurrent - * schema-changing replace. - */ - @Test - public void testOptionalFieldReadableAfterRebase() throws IOException { - TableIdentifier tableId = TableIdentifier.of(NS, "optional_field_test"); - - Schema originalSchema = - new Schema( - required(1, "id", Types.IntegerType.get()), - optional(2, "data", Types.StringType.get())); - - Schema differentSchema = - new Schema( - required(1, "id", Types.IntegerType.get()), - optional(2, "category", Types.StringType.get())); - - catalog.createTable(tableId, originalSchema); - - Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); - Table txnTable = txnA.table(); - DataFile dataFile = writeDataFile(txnTable, originalSchema, 42, "hello"); - txnA.newFastAppend().appendFile(dataFile).commit(); - - Transaction concurrentReplace = - catalog.buildTable(tableId, differentSchema).replaceTransaction(); - concurrentReplace.commitTransaction(); - - txnA.commitTransaction(); - - Table finalTable = catalog.loadTable(tableId); - Types.NestedField dataField = finalTable.schema().findField("data"); - assertThat(dataField).as("data field should exist in final schema").isNotNull(); - assertThat(dataField.fieldId()) - .as("data field should retain its original ID after rebase") - .isEqualTo(2); - - // Data should be readable — not null - List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); - assertThat(records).hasSize(1); - assertThat(records.get(0).getField("id")).isEqualTo(42); - assertThat(records.get(0).getField("data")) - .as("data should be 'hello', not null — field IDs are preserved") - .isEqualTo("hello"); - } - - /** - * Verifies that data remains readable after rebase when the concurrent replace uses a schema with - * completely different column names at the same positions. - */ - @Test - public void testDataReadableAfterConcurrentColumnRename() throws IOException { - TableIdentifier tableId = TableIdentifier.of(NS, "column_addition_test"); - - Schema originalSchema = - new Schema( - required(1, "id", Types.IntegerType.get()), - optional(2, "data", Types.StringType.get())); - - Schema schemaWithExtraColumn = - new Schema( - required(1, "id", Types.IntegerType.get()), - optional(2, "extra_col", Types.LongType.get())); - - catalog.createTable(tableId, originalSchema); - - Transaction txnA = catalog.buildTable(tableId, originalSchema).replaceTransaction(); - Table txnTable = txnA.table(); - DataFile dataFile = writeDataFile(txnTable, originalSchema, 99, "world"); - txnA.newFastAppend().appendFile(dataFile).commit(); - - Transaction concurrentReplace = - catalog.buildTable(tableId, schemaWithExtraColumn).replaceTransaction(); - concurrentReplace.commitTransaction(); - - txnA.commitTransaction(); - - Table finalTable = catalog.loadTable(tableId); - Types.NestedField dataField = finalTable.schema().findField("data"); - assertThat(dataField).as("data field should exist").isNotNull(); - assertThat(dataField.fieldId()) - .as("data field should retain its original ID after rebase") - .isEqualTo(2); - - // Data should be readable - List records = Lists.newArrayList(IcebergGenerics.read(finalTable).build()); - assertThat(records).hasSize(1); - assertThat(records.get(0).getField("id")).isEqualTo(99); - assertThat(records.get(0).getField("data")) - .as("data should be 'world', not null — field IDs are preserved") - .isEqualTo("world"); - } - - private DataFile writeDataFile(Table table, Schema writeSchema, int id, String data) - throws IOException { - Record record = GenericRecord.create(writeSchema); - record.setField("id", id); - record.setField("data", data); - - String filename = FileFormat.PARQUET.addExtension("test-" + UUID.randomUUID()); - File dataDir = new File(tempDir, "data"); - dataDir.mkdirs(); - File file = new File(dataDir, filename); - - GenericFileWriterFactory writerFactory = - new GenericFileWriterFactory.Builder(table).dataFileFormat(FileFormat.PARQUET).build(); - - DataWriter writer = - writerFactory.newDataWriter( - FileHelpers.encrypt(org.apache.iceberg.Files.localOutput(file)), - PartitionSpec.unpartitioned(), - null); - - try (writer) { - writer.write(record); - } - - return writer.toDataFile(); - } -} From 5a30e5a1fb83413606e5aa8abe68e5b07806925b Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 6 May 2026 00:31:16 +0100 Subject: [PATCH 6/6] Improvements --- core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index b68d525d7074..1878f98d9f1f 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -3093,7 +3093,7 @@ public void testConcurrentReplaceTransactionFieldIdsPreserved() { Table afterReplace = catalog.loadTable(TABLE); // Field IDs must match what the replace transaction originally assigned, because - // data files appended during the transaction reference those IDs in their Parquet metadata. + // files added during the transaction reference those IDs assertThat(afterReplace.schema().findField("id").fieldId()) .as("id field ID must be preserved after refresh") .isEqualTo(idFieldId);