Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 75 additions & 70 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand All @@ -72,6 +73,7 @@ enum TransactionType {
Sets.newHashSet(); // keep track of files deleted in the most recent commit
private final Consumer<String> enqueueDelete = deletedFiles::add;
private final TransactionType type;
private final TableMetadata start;
private TableMetadata base;
private TableMetadata current;
private boolean hasLastOpCommitted;
Expand All @@ -91,6 +93,7 @@ enum TransactionType {
this.tableName = tableName;
this.ops = ops;
this.transactionTable = new TransactionTable();
this.start = start;
this.current = start;
this.transactionOps = new TransactionTableOperations();
this.updates = Lists.newArrayList();
Expand Down Expand Up @@ -258,15 +261,9 @@ public void commitTransaction() {
break;

case REPLACE_TABLE:
commitReplaceTransaction(false);
break;

case CREATE_OR_REPLACE_TABLE:
commitReplaceTransaction(true);
break;

case SIMPLE:
commitSimpleTransaction();
commitWithRetry();
break;
}
}
Expand Down Expand Up @@ -295,9 +292,18 @@ private void commitCreateTransaction() {
}
}

private void commitReplaceTransaction(boolean orCreate) {
private void commitWithRetry() {
if (base == current) {
return;
}

Map<String, String> props = base != null ? base.properties() : current.properties();

Set<Long> startingSnapshots =
base != null
? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet())
: ImmutableSet.of();

try {
Tasks.foreach(ops)
.retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Expand All @@ -310,65 +316,9 @@ private void commitReplaceTransaction(boolean orCreate) {
props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
underlyingOps -> {
try {
underlyingOps.refresh();
} catch (NoSuchTableException e) {
if (!orCreate) {
throw e;
}
}

// because this is a replace table, it will always completely replace the table
// metadata. even if it was just updated.
if (base != underlyingOps.current()) {
this.base = underlyingOps.current(); // just refreshed
}

underlyingOps.commit(base, current);
});

} catch (CommitStateUnknownException e) {
throw e;

} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanAllUpdates();
}

throw e;

} finally {
// replace table never needs to retry because the table state is completely replaced. because
// retries are not
// a concern, it is safe to delete all the deleted files from individual operations
deleteUncommittedFiles(deletedFiles);
}
}

private void commitSimpleTransaction() {
// if there were no changes, don't try to commit
if (base == current) {
return;
}

Set<Long> startingSnapshots =
base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
underlyingOps -> {
applyUpdates(underlyingOps);

underlyingOps.commit(base, current);
});

Expand All @@ -378,6 +328,7 @@ private void commitSimpleTransaction() {
} catch (PendingUpdateFailedException e) {
cleanUp();
throw e.wrapped();

} catch (RuntimeException e) {
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanUp();
Expand All @@ -386,8 +337,10 @@ private void commitSimpleTransaction() {
throw e;
}

// the commit succeeded
cleanUpAfterCommitSuccess(startingSnapshots);
}

private void cleanUpAfterCommitSuccess(Set<Long> startingSnapshots) {
try {
// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to ensure that no committed manifest is deleted.
Expand Down Expand Up @@ -441,12 +394,36 @@ private void deleteUncommittedFiles(Iterable<String> paths) {
}

private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
// use refreshed the metadata
this.base = underlyingOps.current();
this.current = underlyingOps.current();
try {
underlyingOps.refresh();
} catch (NoSuchTableException e) {
if (type == TransactionType.CREATE_OR_REPLACE_TABLE) {
this.base = null;
return;
}
throw e;
}

if (underlyingOps.current() == null) {
if (type == TransactionType.CREATE_OR_REPLACE_TABLE) {
this.base = null;
return;
}
throw new NoSuchTableException("Table metadata is not available after refresh");
}

if (base != underlyingOps.current()) {
TableMetadata refreshed = underlyingOps.current();

try {
this.current = startingMetadataFor(refreshed);
} catch (CommitFailedException e) {
throw new PendingUpdateFailedException(e);
}

this.base = refreshed;

for (PendingUpdate update : updates) {
// re-commit each update in the chain to apply it and update current
try {
update.commit();
} catch (CommitFailedException e) {
Expand All @@ -458,6 +435,34 @@ private void applyUpdates(TableOperations underlyingOps) {
}
}

private TableMetadata startingMetadataFor(TableMetadata refreshed) {
return switch (type) {
case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> {
validateFieldIds(start.schema(), refreshed.schema());
yield refreshed.buildReplacementPreservingIds(
start.schema(), start.spec(), start.sortOrder(), start.location(), start.properties());
}
case SIMPLE -> refreshed;
default -> throw new IllegalStateException("Unexpected transaction type for update: " + type);
};
}

/**
* Validates that field IDs in the start schema don't conflict with the refreshed schema. A
* conflict exists when both schemas use the same field ID for columns with different names.
*/
private static void validateFieldIds(Schema startSchema, Schema refreshedSchema) {
for (Types.NestedField startField : startSchema.columns()) {
Types.NestedField refreshedField = refreshedSchema.findField(startField.fieldId());
if (refreshedField != null && !refreshedField.name().equals(startField.name())) {
throw new CommitFailedException(
"Cannot commit replace transaction: field ID %d is '%s' in the replace schema "
+ "but '%s' in the current table schema",
startField.fieldId(), startField.name(), refreshedField.name());
}
}
}

// committedFiles returns null whenever the set of committed files
// cannot be determined from the provided snapshots
private static Set<String> committedFiles(TableOperations ops, Set<Long> snapshotIds) {
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,57 @@ public TableMetadata buildReplacement(
.build();
}

/**
* Builds replacement metadata preserving the caller-provided schema's field IDs.
*
* <p>Unlike {@link #buildReplacement}, which re-derives field IDs by name-matching against the
* current table schema, this method uses the provided schema's field IDs directly. This is used
* when refreshing replace transactions to avoid reassigning field IDs that data files already
* reference.
*
* <p>The {@code lastColumnId} is set to the maximum of the provided schema's highest field ID and
* this metadata's {@code lastColumnId}, preventing future ID collisions.
*/
public TableMetadata buildReplacementPreservingIds(
Schema schemaWithIds,
PartitionSpec updatedPartitionSpec,
SortOrder updatedSortOrder,
String newLocation,
Map<String, String> updatedProperties) {
ValidationException.check(
formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec),
"Spec does not use sequential IDs that are required in v1: %s",
updatedPartitionSpec);

int preservedLastColumnId = Math.max(schemaWithIds.highestFieldId(), lastColumnId);

// rebuild the partition spec using the preserved column ids and reassign partition field ids
// to align with existing partition specs in the table
PartitionSpec freshSpec =
reassignPartitionIds(
freshSpec(INITIAL_SPEC_ID, schemaWithIds, updatedPartitionSpec),
new AtomicInteger(lastAssignedPartitionId)::incrementAndGet);

// rebuild the sort order using preserved column ids
SortOrder freshSortOrder =
freshSortOrder(INITIAL_SORT_ORDER_ID, schemaWithIds, updatedSortOrder);

// check if there is format version override
int newFormatVersion =
PropertyUtil.propertyAsInt(
updatedProperties, TableProperties.FORMAT_VERSION, formatVersion);

return new Builder(this)
.upgradeFormatVersion(newFormatVersion)
.removeRef(SnapshotRef.MAIN_BRANCH)
.setCurrentSchema(schemaWithIds, preservedLastColumnId)
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
.setLocation(newLocation)
.setProperties(persistedProperties(updatedProperties))
.build();
}

public TableMetadata updateLocation(String newLocation) {
return new Builder(this).setLocation(newLocation).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ enum UpdateType {
private final Supplier<Map<String, String>> mutationHeaders;
private final FileIO io;
private final List<MetadataUpdate> createChanges;
private final TableMetadata replaceBase;
private final Set<Endpoint> endpoints;
private UpdateType updateType;
private TableMetadata current;
Expand Down Expand Up @@ -132,7 +131,6 @@ enum UpdateType {
this.io = io;
this.updateType = updateType;
this.createChanges = createChanges;
this.replaceBase = current;
if (updateType == UpdateType.CREATE) {
this.current = null;
} else {
Expand Down Expand Up @@ -179,8 +177,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
.addAll(createChanges)
.addAll(metadata.changes())
.build();
// use the original replace base metadata because the transaction will refresh
requirements = UpdateRequirements.forReplaceTable(replaceBase, updates);
requirements = UpdateRequirements.forReplaceTable(base, updates);
errorHandler = ErrorHandlers.tableCommitHandler();
break;

Expand Down
Loading