Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/docs/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,34 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>restore_as_latest</td>
<td>
-- for Flink 1.18<br/>
-- restore a snapshot as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest('identifier', cast(null as string), snapshotId)<br/><br/>
-- restore a tag as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest('identifier', 'tagName', cast(null as bigint))<br/><br/>
-- for Flink 1.19 and later<br/>
-- restore a snapshot as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
-- restore a tag as the latest snapshot<br/>
CALL [catalog.]sys.restore_as_latest(`table` => 'identifier', tag => 'tagName')
</td>
<td>
To restore a specific version of target table as the latest snapshot without deleting later snapshots or tags.
Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>snapshotId (Long): id of the snapshot that will restore from.</li>
<li>tagName: name of the tag that will restore from.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.restore_as_latest('default.T', cast(null as string), 10)<br/><br/>
-- for Flink 1.19 and later<br/>
CALL sys.restore_as_latest(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>rollback_to_timestamp</td>
<td>
Expand Down
21 changes: 20 additions & 1 deletion docs/docs/maintenance/manage-snapshots.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,25 @@ CALL sys.rollback(table => 'database_name.table_name', snapshot => snasphot_id);

</Tabs>

## Restore Snapshot as Latest

Restore a table to the state of a specific snapshot ID by creating a new latest snapshot. Unlike rollback, this operation
does not delete snapshots or tags whose snapshot id is larger than the restored snapshot.

<Tabs groupId="restore-as-latest">

<TabItem value="flink-sql" label="Flink SQL">

Run the following command:

```sql
CALL sys.restore_as_latest(`table` => 'database_name.table_name', snapshot_id => <snapshot-id>);
```

</TabItem>

</Tabs>

## Remove Orphan Files

Paimon files are deleted physically only when expiring snapshots. However, it is possible that some unexpected errors occurred
Expand Down Expand Up @@ -402,4 +421,4 @@ The table can be `*` to clean all tables in the database.

</TabItem>

</Tabs>
</Tabs>
19 changes: 19 additions & 0 deletions docs/docs/maintenance/manage-tags.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,22 @@ CALL sys.rollback(table => 'test.t', version => '2');
</TabItem>

</Tabs>

## Restore Tag as Latest

Restore a table to the state of a specific tag by creating a new latest snapshot. Unlike rollback, this operation does not
delete snapshots or tags whose snapshot id is larger than the restored tag.

<Tabs groupId="restore-as-latest">

<TabItem value="flink-sql" label="Flink SQL">

Run the following command:

```sql
CALL sys.restore_as_latest(`table` => 'database_name.table_name', tag => 'tag_name');
```

</TabItem>

</Tabs>
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ int overwritePartition(
/** Compact the manifest entries only. */
void compactManifest();

/** Restore the target snapshot as the latest snapshot. */
boolean restoreAsLatest(Snapshot targetSnapshot);

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/**
* Default implementation of {@link FileStoreCommit}.
Expand Down Expand Up @@ -1164,6 +1165,85 @@ public boolean replaceManifestList(
return commitSnapshotImpl(newSnapshot, emptyList());
}

@Override
public boolean restoreAsLatest(Snapshot targetSnapshot) {
Snapshot latest =
checkNotNull(
snapshotManager.latestSnapshot(),
"Latest snapshot is null, can not restore.");

Map<FileEntry.Identifier, ManifestEntry> latestEntries = new HashMap<>();
FileEntry.mergeEntries(
manifestFile,
manifestList.readDataManifests(latest),
latestEntries,
options.scanManifestParallelism());

latestEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);

Map<FileEntry.Identifier, ManifestEntry> targetEntries = new HashMap<>();
FileEntry.mergeEntries(
manifestFile,
manifestList.readDataManifests(targetSnapshot),
targetEntries,
options.scanManifestParallelism());
targetEntries.entrySet().removeIf(entry -> entry.getValue().kind() != FileKind.ADD);

List<ManifestEntry> deltaFiles = new ArrayList<>();
for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : latestEntries.entrySet()) {
if (!targetEntries.containsKey(entry.getKey())) {
ManifestEntry manifestEntry = entry.getValue();
deltaFiles.add(
ManifestEntry.create(
FileKind.DELETE,
manifestEntry.partition(),
manifestEntry.bucket(),
manifestEntry.totalBuckets(),
manifestEntry.file()));
}
}
for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : targetEntries.entrySet()) {
if (!latestEntries.containsKey(entry.getKey())) {
ManifestEntry manifestEntry = entry.getValue();
deltaFiles.add(
ManifestEntry.create(
FileKind.ADD,
manifestEntry.partition(),
manifestEntry.bucket(),
manifestEntry.totalBuckets(),
manifestEntry.file()));
}
}

Pair<String, Long> baseManifestList =
manifestList.write(manifestFile.write(new ArrayList<>(latestEntries.values())));
Pair<String, Long> deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
Snapshot newSnapshot =
new Snapshot(
latest.id() + 1,
targetSnapshot.schemaId(),
baseManifestList.getKey(),
baseManifestList.getRight(),
deltaManifestList.getKey(),
deltaManifestList.getRight(),
null,
null,
targetSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
CommitKind.OVERWRITE,
System.currentTimeMillis(),
targetSnapshot.totalRecordCount(),
recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles),
null,
targetSnapshot.watermark(),
targetSnapshot.statistics(),
targetSnapshot.properties(),
targetSnapshot.nextRowId());

return commitSnapshotImpl(newSnapshot, new ArrayList<>(PartitionEntry.merge(deltaFiles)));
}

public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -203,6 +204,15 @@ public void compactManifests() {
commit.compactManifest();
}

public boolean restoreAsLatest(Snapshot targetSnapshot) {
checkCommitted();
boolean success = commit.restoreAsLatest(targetSnapshot);
if (success) {
maintain(COMMIT_IDENTIFIER, maintainExecutor, true);
}
return success;
}

private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
batchCommitted = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;

import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;

/**
* Restore as latest procedure. Usage:
*
* <pre><code>
* -- restore a snapshot as the latest snapshot
* CALL sys.restore_as_latest(`table` => 'tableId', snapshot_id => snapshotId)
*
* -- restore a tag as the latest snapshot
* CALL sys.restore_as_latest(`table` => 'tableId', tag => 'tagName')
* </code></pre>
*/
public class RestoreAsLatestProcedure extends ProcedureBase {

public static final String IDENTIFIER = "restore_as_latest";

@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "snapshot_id",
type = @DataTypeHint("BIGINT"),
isOptional = true)
})
public @DataTypeHint(
"ROW<previous_snapshot_id BIGINT, restored_snapshot_id BIGINT, current_snapshot_id BIGINT>")
Row[] call(ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;

FileStore<?> store = fileStoreTable.store();
Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, can not restore.");

boolean hasTag = !StringUtils.isNullOrWhitespaceOnly(tagName);
boolean hasSnapshot = snapshotId != null;
Preconditions.checkArgument(
hasTag != hasSnapshot, "Must specify exactly one of tag and snapshot_id.");

Snapshot targetSnapshot;
if (hasTag) {
targetSnapshot = store.newTagManager().getOrThrow(tagName).trimToSnapshot();
} else {
targetSnapshot = findSnapshot(store, snapshotId);
}

try (TableCommitImpl commit =
fileStoreTable.newCommit("restore-as-latest-" + UUID.randomUUID().toString())) {
Preconditions.checkState(
commit.restoreAsLatest(targetSnapshot),
"Failed to restore snapshot %s as latest.",
targetSnapshot.id());
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to restore snapshot %s as latest.", targetSnapshot.id()),
e);
}

return new Row[] {
Row.of(
latestSnapshot.id(),
targetSnapshot.id(),
store.snapshotManager().latestSnapshotId())
};
}

private Snapshot findSnapshot(FileStore<?> store, long snapshotId) {
SnapshotManager snapshotManager = store.snapshotManager();
if (snapshotManager.snapshotExists(snapshotId)) {
return snapshotManager.snapshot(snapshotId);
}

SortedMap<Snapshot, List<String>> tags = store.newTagManager().tags();
for (Map.Entry<Snapshot, List<String>> entry : tags.entrySet()) {
if (entry.getKey().id() == snapshotId) {
return entry.getKey();
} else if (entry.getKey().id() > snapshotId) {
break;
}
}

throw new IllegalArgumentException(
String.format("Restore snapshot '%s' doesn't exist.", snapshotId));
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.RestoreAsLatestProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
Expand Down Expand Up @@ -106,4 +107,4 @@ org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
org.apache.paimon.flink.procedure.ReassignRowIdProcedure
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
org.apache.paimon.flink.procedure.VectorSearchProcedure
org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
Loading
Loading