diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md index 31601354f1a8..325ce57235ed 100644 --- a/docs/docs/flink/procedures.md +++ b/docs/docs/flink/procedures.md @@ -498,6 +498,34 @@ All available procedures are listed below. CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10) + + restore_as_latest + + -- for Flink 1.18
+ -- restore a snapshot as the latest snapshot
+ CALL [catalog.]sys.restore_as_latest('identifier', cast(null as string), snapshotId)

+ -- restore a tag as the latest snapshot
+ CALL [catalog.]sys.restore_as_latest('identifier', 'tagName', cast(null as bigint))

+ -- for Flink 1.19 and later
+ -- restore a snapshot as the latest snapshot
+ CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', snapshot_id => snapshotId)

+ -- restore a tag as the latest snapshot
+ CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', tag => 'tagName') + + + To restore a specific version of target table as the latest snapshot without deleting later snapshots or tags. + Argument: +
  • table: the target table identifier. Cannot be empty.
  • +
  • snapshotId (Long): id of the snapshot that will restore from.
  • +
  • tagName: name of the tag that will restore from.
  • + + + -- for Flink 1.18
    + CALL sys.restore_as_latest('default.T', cast(null as string), 10)

    + -- for Flink 1.19 and later
    + CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 10) + + rollback_to_timestamp diff --git a/docs/docs/maintenance/manage-snapshots.mdx b/docs/docs/maintenance/manage-snapshots.mdx index 821aa0b7559f..edbbf0ca0ddf 100644 --- a/docs/docs/maintenance/manage-snapshots.mdx +++ b/docs/docs/maintenance/manage-snapshots.mdx @@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id); +## Restore Snapshot as Latest + +Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation +does not delete snapshots or tags whose snapshot id is larger than the restored snapshot. + + + + + +Run the following command: + +```sql +CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => ); +``` + + + + + ## Remove Orphan Files Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred @@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database. - \ No newline at end of file + diff --git a/docs/docs/maintenance/manage-tags.mdx b/docs/docs/maintenance/manage-tags.mdx index 78e588e5f53f..173dbe5e7ab2 100644 --- a/docs/docs/maintenance/manage-tags.mdx +++ b/docs/docs/maintenance/manage-tags.mdx @@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2'); + +## Restore Tag as Latest + +Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not +delete snapshots or tags whose snapshot id is larger than the restored tag. + + + + + +Run the following command: + +```sql +CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name'); +``` + + + + diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 31fb3c52cab6..a283360ca3e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -74,6 +74,9 @@ int overwritePartition( /** Compact the manifest entries only. */ void compactManifest(); + /** Restore the target snapshot as the latest snapshot. */ + boolean restoreAsLatest(Snapshot targetSnapshot); + /** Abort an unsuccessful commit. The data files will be deleted. */ void abort(List commitMessages); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 7cca259cbf9f..455efb1fcf1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -107,6 +107,7 @@ import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** * Default implementation of {@link FileStoreCommit}. @@ -1164,6 +1165,85 @@ public boolean replaceManifestList( return commitSnapshotImpl(newSnapshot, emptyList()); } + @Override + public boolean restoreAsLatest(Snapshot targetSnapshot) { + Snapshot latest = + checkNotNull( + snapshotManager.latestSnapshot(), + "Latest snapshot is null, can not restore."); + + Map latestEntries = new HashMap<>(); + FileEntry.mergeEntries( + manifestFile, + manifestList.readDataManifests(latest), + latestEntries, + options.scanManifestParallelism()); + + latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD); + + Map targetEntries = new HashMap<>(); + FileEntry.mergeEntries( + manifestFile, + manifestList.readDataManifests(targetSnapshot), + targetEntries, + options.scanManifestParallelism()); + targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD); + + List deltaFiles = new ArrayList<>(); + for (Map.Entry entry : latestEntries.entrySet()) { + if (!targetEntries.containsKey(entry.getKey())) { + ManifestEntry manifestEntry = entry.getValue(); + deltaFiles.add( + ManifestEntry.create( + FileKind.DELETE, + manifestEntry.partition(), + manifestEntry.bucket(), + manifestEntry.totalBuckets(), + manifestEntry.file())); + } + } + for (Map.Entry entry : targetEntries.entrySet()) { + if (!latestEntries.containsKey(entry.getKey())) { + ManifestEntry manifestEntry = entry.getValue(); + deltaFiles.add( + ManifestEntry.create( + FileKind.ADD, + manifestEntry.partition(), + manifestEntry.bucket(), + manifestEntry.totalBuckets(), + manifestEntry.file())); + } + } + + Pair baseManifestList = + manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values()))); + Pair deltaManifestList = manifestList.write(manifestFile.write(deltaFiles)); + Snapshot newSnapshot = + new Snapshot( + latest.id() + 1, + targetSnapshot.schemaId(), + baseManifestList.getKey(), + baseManifestList.getRight(), + deltaManifestList.getKey(), + deltaManifestList.getRight(), + null, + null, + targetSnapshot.indexManifest(), + commitUser, + Long.MAX_VALUE, + CommitKind.OVERWRITE, + System.currentTimeMillis(), + targetSnapshot.totalRecordCount(), + recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles), + null, + targetSnapshot.watermark(), + targetSnapshot.statistics(), + targetSnapshot.properties(), + targetSnapshot.nextRowId()); + + return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles))); + } + public void compactManifest() { int retryCount = 0; long startMillis = System.currentTimeMillis(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 0311c9bbe4d8..167394f18b20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.Path; @@ -203,6 +204,15 @@ public void compactManifests() { commit.compactManifest(); } + public boolean restoreAsLatest(Snapshot targetSnapshot) { + checkCommitted(); + boolean success = commit.restoreAsLatest(targetSnapshot); + if (success) { + maintain(COMMIT_IDENTIFIER, maintainExecutor, true); + } + return success; + } + private void checkCommitted() { checkState(!batchCommitted, "BatchTableCommit only support one-time committing."); batchCommitted = true; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java new file mode 100644 index 000000000000..8aeb8576d70a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedure.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.StringUtils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; + +/** + * Restore as latest procedure. Usage: + * + *
    
    + *  -- restore a snapshot as the latest snapshot
    + *  CALL sys.restore_as_latest(`table` => 'tableId', snapshot_id => snapshotId)
    + *
    + *  -- restore a tag as the latest snapshot
    + *  CALL sys.restore_as_latest(`table` => 'tableId', tag => 'tagName')
    + * 
    + */ +public class RestoreAsLatestProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "restore_as_latest"; + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "snapshot_id", + type = @DataTypeHint("BIGINT"), + isOptional = true) + }) + public @DataTypeHint( + "ROW") + Row[] call(ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + + FileStore store = fileStoreTable.store(); + Snapshot latestSnapshot = store.snapshotManager().latestSnapshot(); + Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, can not restore."); + + boolean hasTag = !StringUtils.isNullOrWhitespaceOnly(tagName); + boolean hasSnapshot = snapshotId != null; + Preconditions.checkArgument( + hasTag != hasSnapshot, "Must specify exactly one of tag and snapshot_id."); + + Snapshot targetSnapshot; + if (hasTag) { + targetSnapshot = store.newTagManager().getOrThrow(tagName).trimToSnapshot(); + } else { + targetSnapshot = findSnapshot(store, snapshotId); + } + + try (TableCommitImpl commit = + fileStoreTable.newCommit("restore-as-latest-" + UUID.randomUUID().toString())) { + Preconditions.checkState( + commit.restoreAsLatest(targetSnapshot), + "Failed to restore snapshot %s as latest.", + targetSnapshot.id()); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to restore snapshot %s as latest.", targetSnapshot.id()), + e); + } + + return new Row[] { + Row.of( + latestSnapshot.id(), + targetSnapshot.id(), + store.snapshotManager().latestSnapshotId()) + }; + } + + private Snapshot findSnapshot(FileStore store, long snapshotId) { + SnapshotManager snapshotManager = store.snapshotManager(); + if (snapshotManager.snapshotExists(snapshotId)) { + return snapshotManager.snapshot(snapshotId); + } + + SortedMap> tags = store.newTagManager().tags(); + for (Map.Entry> entry : tags.entrySet()) { + if (entry.getKey().id() == snapshotId) { + return entry.getKey(); + } else if (entry.getKey().id() > snapshotId) { + break; + } + } + + throw new IllegalArgumentException( + String.format("Restore snapshot '%s' doesn't exist.", snapshotId)); + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index db2777a3a0fc..b999168f2cbb 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure org.apache.paimon.flink.procedure.RollbackToProcedure +org.apache.paimon.flink.procedure.RestoreAsLatestProcedure org.apache.paimon.flink.procedure.RollbackToTimestampProcedure org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure org.apache.paimon.flink.procedure.MigrateTableProcedure @@ -106,4 +107,4 @@ org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure org.apache.paimon.flink.procedure.ReassignRowIdProcedure org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure org.apache.paimon.flink.procedure.VectorSearchProcedure -org.apache.paimon.flink.procedure.DropGlobalIndexProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.DropGlobalIndexProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java new file mode 100644 index 000000000000..e2bf7fcb661f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RestoreAsLatestProcedureITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** IT cases for restore_as_latest procedure. */ +public class RestoreAsLatestProcedureITCase extends CatalogITCaseBase { + + @Test + public void testRestoreSnapshotAsLatest() throws Exception { + sql("CREATE TABLE T (id INT, name STRING)"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + commitRow(table, 1, "a"); + commitRow(table, 2, "b"); + commitRow(table, 3, "c"); + assertEquals(3, snapshotManager.latestSnapshotId()); + + assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 1)")) + .containsExactly(Row.of(3L, 1L, 4L)); + + assertEquals(4, snapshotManager.latestSnapshotId()); + assertRestoreDelta(table, 4, 0, 2, -2L); + assertTrue(snapshotManager.snapshotExists(2)); + assertTrue(snapshotManager.snapshotExists(3)); + assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a")); + + assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 3)")) + .containsExactly(Row.of(4L, 3L, 5L)); + + assertEquals(5, snapshotManager.latestSnapshotId()); + assertRestoreDelta(table, 5, 2, 0, 2L); + assertThat(sql("SELECT * FROM T")) + .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c")); + + commitRow(table, 4, "d"); + assertEquals(6, snapshotManager.latestSnapshotId()); + assertThat(sql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, "a"), Row.of(2, "b"), Row.of(3, "c"), Row.of(4, "d")); + } + + @Test + public void testRestoreTagAsLatest() throws Exception { + sql("CREATE TABLE T (id INT, name STRING)"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + commitRow(table, 1, "a"); + commitRow(table, 2, "b"); + commitRow(table, 3, "c"); + assertEquals(3, snapshotManager.latestSnapshotId()); + + sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-1', snapshot_id => 1)"); + + assertThat(sql("CALL sys.restore_as_latest(`table` => 'default.T', tag => 'tag-1')")) + .containsExactly(Row.of(3L, 1L, 4L)); + + assertEquals(4, snapshotManager.latestSnapshotId()); + assertRestoreDelta(table, 4, 0, 2, -2L); + assertTrue(snapshotManager.snapshotExists(2)); + assertTrue(snapshotManager.snapshotExists(3)); + assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, "a")); + } + + private void assertRestoreDelta( + FileStoreTable table, + long snapshotId, + long expectedNumAddedFiles, + long expectedNumDeletedFiles, + long expectedDeltaRecordCount) { + Snapshot snapshot = table.snapshot(snapshotId); + ManifestList manifestList = table.store().manifestListFactory().create(); + List deltaManifests = manifestList.readDeltaManifests(snapshot); + + assertThat(deltaManifests).hasSize(1); + assertThat(deltaManifests.get(0).numAddedFiles()).isEqualTo(expectedNumAddedFiles); + assertThat(deltaManifests.get(0).numDeletedFiles()).isEqualTo(expectedNumDeletedFiles); + assertThat(snapshot.deltaRecordCount()).isEqualTo(expectedDeltaRecordCount); + } + + private void commitRow(FileStoreTable table, int id, String name) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(id, BinaryString.fromString(name))); + commit.commit(write.prepareCommit()); + } + } +}