diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index 31601354f1a8..325ce57235ed 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -498,6 +498,34 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
+
| rollback_to_timestamp |
diff --git a/docs/docs/maintenance/manage-snapshots.mdx b/docs/docs/maintenance/manage-snapshots.mdx
index 821aa0b7559f..edbbf0ca0ddf 100644
--- a/docs/docs/maintenance/manage-snapshots.mdx
+++ b/docs/docs/maintenance/manage-snapshots.mdx
@@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);
+## Restore Snapshot as Latest
+
+Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation
+does not delete snapshots or tags whose snapshot id is larger than the restored snapshot.
+
+
+
+
+
+Run the following command:
+
+```sql
+CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => );
+```
+
+
+
+
+
## Remove Orphan Files
Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
@@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database.
-
\ No newline at end of file
+
diff --git a/docs/docs/maintenance/manage-tags.mdx b/docs/docs/maintenance/manage-tags.mdx
index 78e588e5f53f..173dbe5e7ab2 100644
--- a/docs/docs/maintenance/manage-tags.mdx
+++ b/docs/docs/maintenance/manage-tags.mdx
@@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2');
+
+## Restore Tag as Latest
+
+Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not
+delete snapshots or tags whose snapshot id is larger than the restored tag.
+
+
+
+
+
+Run the following command:
+
+```sql
+CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name');
+```
+
+
+
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 31fb3c52cab6..a283360ca3e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -74,6 +74,9 @@ int overwritePartition(
/** Compact the manifest entries only. */
void compactManifest();
+ /** Restore the target snapshot as the latest snapshot. */
+ boolean restoreAsLatest(Snapshot targetSnapshot);
+
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List commitMessages);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7cca259cbf9f..455efb1fcf1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -107,6 +107,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -1164,6 +1165,85 @@ public boolean replaceManifestList(
return commitSnapshotImpl(newSnapshot, emptyList());
}
+ @Override
+ public boolean restoreAsLatest(Snapshot targetSnapshot) {
+ Snapshot latest =
+ checkNotNull(
+ snapshotManager.latestSnapshot(),
+ "Latest snapshot is null, can not restore.");
+
+ Map latestEntries = new HashMap<>();
+ FileEntry.mergeEntries(
+ manifestFile,
+ manifestList.readDataManifests(latest),
+ latestEntries,
+ options.scanManifestParallelism());
+
+ latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
+
+ Map targetEntries = new HashMap<>();
+ FileEntry.mergeEntries(
+ manifestFile,
+ manifestList.readDataManifests(targetSnapshot),
+ targetEntries,
+ options.scanManifestParallelism());
+ targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);
+
+ List deltaFiles = new ArrayList<>();
+ for (Map.Entry entry : latestEntries.entrySet()) {
+ if (!targetEntries.containsKey(entry.getKey())) {
+ ManifestEntry manifestEntry = entry.getValue();
+ deltaFiles.add(
+ ManifestEntry.create(
+ FileKind.DELETE,
+ manifestEntry.partition(),
+ manifestEntry.bucket(),
+ manifestEntry.totalBuckets(),
+ manifestEntry.file()));
+ }
+ }
+ for (Map.Entry entry : targetEntries.entrySet()) {
+ if (!latestEntries.containsKey(entry.getKey())) {
+ ManifestEntry manifestEntry = entry.getValue();
+ deltaFiles.add(
+ ManifestEntry.create(
+ FileKind.ADD,
+ manifestEntry.partition(),
+ manifestEntry.bucket(),
+ manifestEntry.totalBuckets(),
+ manifestEntry.file()));
+ }
+ }
+
+ Pair baseManifestList =
+ manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values())));
+ Pair deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
+ Snapshot newSnapshot =
+ new Snapshot(
+ latest.id() + 1,
+ targetSnapshot.schemaId(),
+ baseManifestList.getKey(),
+ baseManifestList.getRight(),
+ deltaManifestList.getKey(),
+ deltaManifestList.getRight(),
+ null,
+ null,
+ targetSnapshot.indexManifest(),
+ commitUser,
+ Long.MAX_VALUE,
+ CommitKind.OVERWRITE,
+ System.currentTimeMillis(),
+ targetSnapshot.totalRecordCount(),
+ recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles),
+ null,
+ targetSnapshot.watermark(),
+ targetSnapshot.statistics(),
+ targetSnapshot.properties(),
+ targetSnapshot.nextRowId());
+
+ return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));
+ }
+
public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 0311c9bbe4d8..167394f18b20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
@@ -203,6 +204,15 @@ public void compactManifests() {
commit.compactManifest();
}
+ public boolean restoreAsLatest(Snapshot targetSnapshot) {
+ checkCommitted();
+ boolean success = commit.restoreAsLatest(targetSnapshot);
+ if (success) {
+ maintain(COMMIT_IDENTIFIER, maintainExecutor, true);
+ }
+ return success;
+ }
+
private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
batchCommitted = true;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java
new file mode 100644
index 000000000000..8aeb8576d70a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+
+/**
+ * Restore as latest procedure. Usage:
+ *
+ *
+ * -- restore a snapshot as the latest snapshot
+ * CALL sys.restore_as_latest(`table` => 'tableId', snapshot_id => snapshotId)
+ *
+ * -- restore a tag as the latest snapshot
+ * CALL sys.restore_as_latest(`table` => 'tableId', tag => 'tagName')
+ *
+ */
+public class RestoreAsLatestProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "restore_as_latest";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(
+ name = "snapshot_id",
+ type = @DataTypeHint("BIGINT"),
+ isOptional = true)
+ })
+ public @DataTypeHint(
+ "ROW")
+ Row[] call(ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ FileStore> store = fileStoreTable.store();
+ Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+ Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, can not restore.");
+
+ boolean hasTag = !StringUtils.isNullOrWhitespaceOnly(tagName);
+ boolean hasSnapshot = snapshotId != null;
+ Preconditions.checkArgument(
+ hasTag != hasSnapshot, "Must specify exactly one of tag and snapshot_id.");
+
+ Snapshot targetSnapshot;
+ if (hasTag) {
+ targetSnapshot = store.newTagManager().getOrThrow(tagName).trimToSnapshot();
+ } else {
+ targetSnapshot = findSnapshot(store, snapshotId);
+ }
+
+ try (TableCommitImpl commit =
+ fileStoreTable.newCommit("restore-as-latest-" + UUID.randomUUID().toString())) {
+ Preconditions.checkState(
+ commit.restoreAsLatest(targetSnapshot),
+ "Failed to restore snapshot %s as latest.",
+ targetSnapshot.id());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to restore snapshot %s as latest.", targetSnapshot.id()),
+ e);
+ }
+
+ return new Row[] {
+ Row.of(
+ latestSnapshot.id(),
+ targetSnapshot.id(),
+ store.snapshotManager().latestSnapshotId())
+ };
+ }
+
+ private Snapshot findSnapshot(FileStore> store, long snapshotId) {
+ SnapshotManager snapshotManager = store.snapshotManager();
+ if (snapshotManager.snapshotExists(snapshotId)) {
+ return snapshotManager.snapshot(snapshotId);
+ }
+
+ SortedMap> tags = store.newTagManager().tags();
+ for (Map.Entry> entry : tags.entrySet()) {
+ if (entry.getKey().id() == snapshotId) {
+ return entry.getKey();
+ } else if (entry.getKey().id() > snapshotId) {
+ break;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Restore snapshot '%s' doesn't exist.", snapshotId));
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index db2777a3a0fc..b999168f2cbb 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
+org.apache.paimon.flink.procedure.RestoreAsLatestProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
@@ -106,4 +107,4 @@ org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
org.apache.paimon.flink.procedure.ReassignRowIdProcedure
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
org.apache.paimon.flink.procedure.VectorSearchProcedure
-org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java
new file mode 100644
index 000000000000..e2bf7fcb661f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** IT cases for restore_as_latest procedure. */
+public class RestoreAsLatestProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testRestoreSnapshotAsLatest() throws Exception {
+ sql("CREATE TABLE T (id INT, name STRING)");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 1)"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ assertEquals(4, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 4, 0, 2, -2L);
+ assertTrue(snapshotManager.snapshotExists(2));
+ assertTrue(snapshotManager.snapshotExists(3));
+ assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3)"))
+ .containsExactly(Row.of(4L, 3L, 5L));
+
+ assertEquals(5, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 5, 2, 0, 2L);
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"));
+
+ commitRow(table, 4, "d");
+ assertEquals(6, snapshotManager.latestSnapshotId());
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"), Row.of(4, "d"));
+ }
+
+ @Test
+ public void testRestoreTagAsLatest() throws Exception {
+ sql("CREATE TABLE T (id INT, name STRING)");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ commitRow(table, 1, "a");
+ commitRow(table, 2, "b");
+ commitRow(table, 3, "c");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-1', snapshot_id => 1)");
+
+ assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', tag => 'tag-1')"))
+ .containsExactly(Row.of(3L, 1L, 4L));
+
+ assertEquals(4, snapshotManager.latestSnapshotId());
+ assertRestoreDelta(table, 4, 0, 2, -2L);
+ assertTrue(snapshotManager.snapshotExists(2));
+ assertTrue(snapshotManager.snapshotExists(3));
+ assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a"));
+ }
+
+ private void assertRestoreDelta(
+ FileStoreTable table,
+ long snapshotId,
+ long expectedNumAddedFiles,
+ long expectedNumDeletedFiles,
+ long expectedDeltaRecordCount) {
+ Snapshot snapshot = table.snapshot(snapshotId);
+ ManifestList manifestList = table.store().manifestListFactory().create();
+ List deltaManifests = manifestList.readDeltaManifests(snapshot);
+
+ assertThat(deltaManifests).hasSize(1);
+ assertThat(deltaManifests.get(0).numAddedFiles()).isEqualTo(expectedNumAddedFiles);
+ assertThat(deltaManifests.get(0).numDeletedFiles()).isEqualTo(expectedNumDeletedFiles);
+ assertThat(snapshot.deltaRecordCount()).isEqualTo(expectedDeltaRecordCount);
+ }
+
+ private void commitRow(FileStoreTable table, int id, String name) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(id, BinaryString.fromString(name)));
+ commit.commit(write.prepareCommit());
+ }
+ }
+}
|