diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..3184b92822a7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -48,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -72,6 +73,7 @@ enum TransactionType { Sets.newHashSet(); // keep track of files deleted in the most recent commit private final Consumer enqueueDelete = deletedFiles::add; private final TransactionType type; + private final TableMetadata start; private TableMetadata base; private TableMetadata current; private boolean hasLastOpCommitted; @@ -91,6 +93,7 @@ enum TransactionType { this.tableName = tableName; this.ops = ops; this.transactionTable = new TransactionTable(); + this.start = start; this.current = start; this.transactionOps = new TransactionTableOperations(); this.updates = Lists.newArrayList(); @@ -258,15 +261,9 @@ public void commitTransaction() { break; case REPLACE_TABLE: - commitReplaceTransaction(false); - break; - case CREATE_OR_REPLACE_TABLE: - commitReplaceTransaction(true); - break; - case SIMPLE: - commitSimpleTransaction(); + commitWithRetry(); break; } } @@ -295,9 +292,18 @@ private void commitCreateTransaction() { } } - private void commitReplaceTransaction(boolean orCreate) { + private void commitWithRetry() { + if (base == current) { + return; + } + Map props = base != null ? base.properties() : current.properties(); + Set startingSnapshots = + base != null + ? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()) + : ImmutableSet.of(); + try { Tasks.foreach(ops) .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) @@ -310,65 +316,9 @@ private void commitReplaceTransaction(boolean orCreate) { props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .run( - underlyingOps -> { - try { - underlyingOps.refresh(); - } catch (NoSuchTableException e) { - if (!orCreate) { - throw e; - } - } - - // because this is a replace table, it will always completely replace the table - // metadata. even if it was just updated. - if (base != underlyingOps.current()) { - this.base = underlyingOps.current(); // just refreshed - } - - underlyingOps.commit(base, current); - }); - - } catch (CommitStateUnknownException e) { - throw e; - - } catch (RuntimeException e) { - // the commit failed and no files were committed. clean up each update. - if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { - cleanAllUpdates(); - } - - throw e; - - } finally { - // replace table never needs to retry because the table state is completely replaced. because - // retries are not - // a concern, it is safe to delete all the deleted files from individual operations - deleteUncommittedFiles(deletedFiles); - } - } - - private void commitSimpleTransaction() { - // if there were no changes, don't try to commit - if (base == current) { - return; - } - - Set startingSnapshots = - base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); - try { - Tasks.foreach(ops) - .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { applyUpdates(underlyingOps); - underlyingOps.commit(base, current); }); @@ -378,6 +328,7 @@ private void commitSimpleTransaction() { } catch (PendingUpdateFailedException e) { cleanUp(); throw e.wrapped(); + } catch (RuntimeException e) { if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { cleanUp(); @@ -386,8 +337,10 @@ private void commitSimpleTransaction() { throw e; } - // the commit succeeded + cleanUpAfterCommitSuccess(startingSnapshots); + } + private void cleanUpAfterCommitSuccess(Set startingSnapshots) { try { // clean up the data files that were deleted by each operation. first, get the list of // committed manifests to ensure that no committed manifest is deleted. @@ -441,12 +394,36 @@ private void deleteUncommittedFiles(Iterable paths) { } private void applyUpdates(TableOperations underlyingOps) { - if (base != underlyingOps.refresh()) { - // use refreshed the metadata - this.base = underlyingOps.current(); - this.current = underlyingOps.current(); + try { + underlyingOps.refresh(); + } catch (NoSuchTableException e) { + if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + this.base = null; + return; + } + throw e; + } + + if (underlyingOps.current() == null) { + if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + this.base = null; + return; + } + throw new NoSuchTableException("Table metadata is not available after refresh"); + } + + if (base != underlyingOps.current()) { + TableMetadata refreshed = underlyingOps.current(); + + try { + this.current = startingMetadataFor(refreshed); + } catch (CommitFailedException e) { + throw new PendingUpdateFailedException(e); + } + + this.base = refreshed; + for (PendingUpdate update : updates) { - // re-commit each update in the chain to apply it and update current try { update.commit(); } catch (CommitFailedException e) { @@ -458,6 +435,34 @@ private void applyUpdates(TableOperations underlyingOps) { } } + private TableMetadata startingMetadataFor(TableMetadata refreshed) { + return switch (type) { + case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> { + validateFieldIds(start.schema(), refreshed.schema()); + yield refreshed.buildReplacementPreservingIds( + start.schema(), start.spec(), start.sortOrder(), start.location(), start.properties()); + } + case SIMPLE -> refreshed; + default -> throw new IllegalStateException("Unexpected transaction type for update: " + type); + }; + } + + /** + * Validates that field IDs in the start schema don't conflict with the refreshed schema. A + * conflict exists when both schemas use the same field ID for columns with different names. + */ + private static void validateFieldIds(Schema startSchema, Schema refreshedSchema) { + for (Types.NestedField startField : startSchema.columns()) { + Types.NestedField refreshedField = refreshedSchema.findField(startField.fieldId()); + if (refreshedField != null && !refreshedField.name().equals(startField.name())) { + throw new CommitFailedException( + "Cannot commit replace transaction: field ID %d is '%s' in the replace schema " + + "but '%s' in the current table schema", + startField.fieldId(), startField.name(), refreshedField.name()); + } + } + } + // committedFiles returns null whenever the set of committed files // cannot be determined from the provided snapshots private static Set committedFiles(TableOperations ops, Set snapshotIds) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 43a67dd2bef2..f75a4e3070fc 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -745,6 +745,57 @@ public TableMetadata buildReplacement( .build(); } + /** + * Builds replacement metadata preserving the caller-provided schema's field IDs. + * + *

Unlike {@link #buildReplacement}, which re-derives field IDs by name-matching against the + * current table schema, this method uses the provided schema's field IDs directly. This is used + * when refreshing replace transactions to avoid reassigning field IDs that data files already + * reference. + * + *

The {@code lastColumnId} is set to the maximum of the provided schema's highest field ID and + * this metadata's {@code lastColumnId}, preventing future ID collisions. + */ + public TableMetadata buildReplacementPreservingIds( + Schema schemaWithIds, + PartitionSpec updatedPartitionSpec, + SortOrder updatedSortOrder, + String newLocation, + Map updatedProperties) { + ValidationException.check( + formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", + updatedPartitionSpec); + + int preservedLastColumnId = Math.max(schemaWithIds.highestFieldId(), lastColumnId); + + // rebuild the partition spec using the preserved column ids and reassign partition field ids + // to align with existing partition specs in the table + PartitionSpec freshSpec = + reassignPartitionIds( + freshSpec(INITIAL_SPEC_ID, schemaWithIds, updatedPartitionSpec), + new AtomicInteger(lastAssignedPartitionId)::incrementAndGet); + + // rebuild the sort order using preserved column ids + SortOrder freshSortOrder = + freshSortOrder(INITIAL_SORT_ORDER_ID, schemaWithIds, updatedSortOrder); + + // check if there is format version override + int newFormatVersion = + PropertyUtil.propertyAsInt( + updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); + + return new Builder(this) + .upgradeFormatVersion(newFormatVersion) + .removeRef(SnapshotRef.MAIN_BRANCH) + .setCurrentSchema(schemaWithIds, preservedLastColumnId) + .setDefaultPartitionSpec(freshSpec) + .setDefaultSortOrder(freshSortOrder) + .setLocation(newLocation) + .setProperties(persistedProperties(updatedProperties)) + .build(); + } + public TableMetadata updateLocation(String newLocation) { return new Builder(this).setLocation(newLocation).build(); } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index be763d30fef1..40e4b6c3da06 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -59,7 +59,6 @@ enum UpdateType { private final Supplier> mutationHeaders; private final FileIO io; private final List createChanges; - private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; @@ -132,7 +131,6 @@ enum UpdateType { this.io = io; this.updateType = updateType; this.createChanges = createChanges; - this.replaceBase = current; if (updateType == UpdateType.CREATE) { this.current = null; } else { @@ -179,8 +177,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { .addAll(createChanges) .addAll(metadata.changes()) .build(); - // use the original replace base metadata because the transaction will refresh - requirements = UpdateRequirements.forReplaceTable(replaceBase, updates); + requirements = UpdateRequirements.forReplaceTable(base, updates); errorHandler = ErrorHandlers.tableCommitHandler(); break; diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 833b2fb0b46f..1878f98d9f1f 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2655,15 +2655,20 @@ public void testReplaceTableKeepsSnapshotLog() { .containsExactly(snapshotBeforeReplace, snapshotAfterReplace); } - @Test - public void testConcurrentReplaceTransactions() { + @ParameterizedTest + @ValueSource(ints = {2, 3}) + public void testConcurrentReplaceTransactions(int formatVersion) { C catalog = catalog(); if (requiresNamespaceCreate()) { catalog.createNamespace(NS); } - Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + Transaction transaction = + catalog + .buildTable(TABLE, SCHEMA) + .withProperty("format-version", String.valueOf(formatVersion)) + .createTransaction(); transaction.newFastAppend().appendFile(FILE_A).commit(); transaction.commitTransaction(); @@ -2691,6 +2696,8 @@ public void testConcurrentReplaceTransactions() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); + // All three successfully committed snapshots should be present + assertThat(afterSecondReplace.snapshots()).hasSize(3); assertThat(afterSecondReplace.schema().asStruct()) .as("Table schema should match the original schema") .isEqualTo(original.schema().asStruct()); @@ -2782,8 +2789,6 @@ public void testConcurrentReplaceTransactionSchema2() { @Test public void testConcurrentReplaceTransactionSchemaConflict() { - assumeThat(supportsServerSideRetry()).as("Schema conflicts are detected server-side").isTrue(); - C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2806,18 +2811,18 @@ public void testConcurrentReplaceTransactionSchemaConflict() { Table afterFirstReplace = catalog.loadTable(TABLE); assertThat(afterFirstReplace.schema().asStruct()) - .as("Table schema should match the original schema") + .as("Table schema should match the new schema") .isEqualTo(REPLACE_SCHEMA.asStruct()); assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new schema is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned field id changed"); + // Both replaces use the same schema, so field IDs match after refresh — succeeds + secondReplace.commitTransaction(); + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema should match the new schema") + .isEqualTo(REPLACE_SCHEMA.asStruct()); } @Test @@ -2902,7 +2907,6 @@ public void testConcurrentReplaceTransactionPartitionSpec2() { @Test public void testConcurrentReplaceTransactionPartitionSpecConflict() { - assumeThat(supportsServerSideRetry()).as("Spec conflicts are detected server-side").isTrue(); C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2932,12 +2936,11 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new spec is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned partition id changed"); + // Both replaces use the same spec, so the refresh succeeds + secondReplace.commitTransaction(); + assertThat(catalog.loadTable(TABLE).spec().fields()) + .as("Table spec should match the new spec") + .isEqualTo(TABLE_SPEC.fields()); } @Test @@ -3021,6 +3024,87 @@ public void testConcurrentReplaceTransactionSortOrderConflict() { assertFiles(afterSecondReplace, FILE_C); } + @Test + public void testConcurrentReplaceTransactionFieldIdConflict() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + // Two schemas with completely different column names. Both buildReplacement calls assign + // the same fresh IDs (starting from lastColumnId=1) to different columns. + Schema schemaA = + new Schema( + required(10, "col_a1", Types.IntegerType.get()), + required(11, "col_a2", Types.StringType.get())); + Schema schemaB = + new Schema( + required(20, "col_b1", Types.IntegerType.get()), + required(21, "col_b2", Types.StringType.get())); + + Transaction replaceA = catalog.buildTable(TABLE, schemaA).replaceTransaction(); + replaceA.newFastAppend().appendFile(FILE_A).commit(); + + Transaction replaceB = catalog.buildTable(TABLE, schemaB).replaceTransaction(); + replaceB.newFastAppend().appendFile(FILE_B).commit(); + replaceB.commitTransaction(); + + // A's field IDs conflict with B's — same IDs, different columns + assertThatThrownBy(replaceA::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Cannot commit replace transaction"); + } + + @Test + public void testConcurrentReplaceTransactionFieldIdsPreserved() { + C catalog = catalog(); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + // Create a table with OTHER_SCHEMA = {1: some_id} + catalog.buildTable(TABLE, OTHER_SCHEMA).create(); + + Table original = catalog.loadTable(TABLE); + assertThat(original.schema().columns()).hasSize(1); + assertThat(original.schema().findField("some_id").fieldId()).isEqualTo(1); + + // Start a replace with SCHEMA = {id, data} — this gets fresh IDs {2: id, 3: data} + Transaction replace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + replace.newFastAppend().appendFile(FILE_A).commit(); + + // Capture the field IDs that the replace transaction's schema was assigned + Table replaceView = replace.table(); + int idFieldId = replaceView.schema().findField("id").fieldId(); + int dataFieldId = replaceView.schema().findField("data").fieldId(); + + // Concurrently, another replace changes the schema to something different + Transaction concurrentReplace = catalog.buildTable(TABLE, SCHEMA).replaceTransaction(); + concurrentReplace.commitTransaction(); + + // The concurrent replace changed the table schema and lastColumnId. + // Concurrent replace with the same schema — field IDs will match after refresh + replace.commitTransaction(); + + Table afterReplace = catalog.loadTable(TABLE); + + // Field IDs must match what the replace transaction originally assigned, because + // files added during the transaction reference those IDs + assertThat(afterReplace.schema().findField("id").fieldId()) + .as("id field ID must be preserved after refresh") + .isEqualTo(idFieldId); + assertThat(afterReplace.schema().findField("data").fieldId()) + .as("data field ID must be preserved after refresh") + .isEqualTo(dataFieldId); + + assertUUIDsMatch(original, afterReplace); + assertFiles(afterReplace, FILE_A); + } + @ParameterizedTest @ValueSource(ints = {1, 2, 3}) public void createTableTransaction(int formatVersion) { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java index c3f66c0286d0..a7b7de74e7cd 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java @@ -239,7 +239,7 @@ public void testReplaceTableTxnTableModifiedConcurrently() { table = catalog.loadTable(TABLE_IDENTIFIER); assertThat(table.properties()) .as("Table props should be updated") - .doesNotContainKey("another-prop") + .containsEntry("another-prop", "another-value") .containsEntry("prop", "value"); } @@ -329,9 +329,9 @@ public void testCreateOrReplaceTableTxnTableCreatedConcurrently() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - assertThat(table.spec()) - .as("Partition spec should match") - .isEqualTo(PartitionSpec.unpartitioned()); + assertThat(table.spec().isUnpartitioned()) + .as("Partition spec should be unpartitioned") + .isTrue(); assertThat(table.properties()).as("Table props should match").containsEntry("prop", "value"); }