Skip to content

Commit 87996be

Browse files
author
Liebing
committed
wip
1 parent 8eaad3b commit 87996be

16 files changed

Lines changed: 775 additions & 82 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
import org.apache.fluss.security.acl.FlussPrincipal;
2727

2828
import java.util.List;
29+
import java.util.Optional;
2930

3031
/**
31-
* A catalog interface to modify metadata in external datalake.
32+
* A catalog interface to modify or get metadata in external datalake.
3233
*
3334
* @since 0.7
3435
*/
@@ -57,6 +58,17 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context c
5758
void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
5859
throws TableNotExistException;
5960

61+
/**
62+
* Get the latest snapshot info for the given table.
63+
*
64+
* @param tablePath path of the table to get the latest snapshot info
65+
* @param context contextual information needed for get latest snapshot info
66+
* @return the latest snapshot information for the given table, or empty if the table does not
67+
* exist or has no snapshots.
68+
* @since 0.9
69+
*/
70+
Optional<LakeSnapshotInfo> getLatestSnapshotInfo(TablePath tablePath, Context context);
71+
6072
@Override
6173
default void close() throws Exception {
6274
// default do nothing
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.lakestorage;
19+
20+
import javax.annotation.Nullable;
21+
22+
/**
23+
* Represents the metadata information of a snapshot in a data lake table.
24+
*
25+
* @see LakeCatalog#getLatestSnapshotInfo
26+
* @since 0.9
27+
*/
28+
public class LakeSnapshotInfo {
29+
30+
private final long snapshotId;
31+
32+
private final long commitTimestampMillis;
33+
34+
/**
35+
* The {@code fluss-offsets} property recorded in the snapshot summary.
36+
*
37+
* <p>This property has two different formats depending on the Fluss version that produced the
38+
* snapshot:
39+
*
40+
* <ul>
41+
* <li><b>v1 (JSON format, produced by Fluss 0.8):</b> A JSON string starting with <code>
42+
* '&#123;'</code> that contains the serialized {@code TableBucketOffsets} data directly.
43+
* <li><b>v2 (Path format, produced by Fluss 0.9+):</b> A file path pointing to the offsets
44+
* file, following the pattern: {@code
45+
* {remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/metadata/{UUID}.offsets}
46+
* </ul>
47+
*/
48+
@Nullable private final String flussOffsetsProperty;
49+
50+
public LakeSnapshotInfo(
51+
long snapshotId, long commitTimestampMillis, @Nullable String flussOffsetsProperty) {
52+
this.snapshotId = snapshotId;
53+
this.commitTimestampMillis = commitTimestampMillis;
54+
this.flussOffsetsProperty = flussOffsetsProperty;
55+
}
56+
57+
public long getSnapshotId() {
58+
return snapshotId;
59+
}
60+
61+
public long getCommitTimestampMillis() {
62+
return commitTimestampMillis;
63+
}
64+
65+
@Nullable
66+
public String getFlussOffsetsProperty() {
67+
return flussOffsetsProperty;
68+
}
69+
}

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.fluss.utils.WrappingProxy;
3030

3131
import java.util.List;
32+
import java.util.Optional;
3233

3334
/**
3435
* A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all
@@ -91,6 +92,14 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
9192
}
9293
}
9394

95+
@Override
96+
public Optional<LakeSnapshotInfo> getLatestSnapshotInfo(
97+
TablePath tablePath, Context context) {
98+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
99+
return inner.getLatestSnapshotInfo(tablePath, context);
100+
}
101+
}
102+
94103
@Override
95104
public void close() throws Exception {
96105
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Iterator;
3535
import java.util.List;
3636
import java.util.Map;
37+
import java.util.Optional;
3738

3839
import static org.assertj.core.api.Assertions.assertThat;
3940
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -153,5 +154,11 @@ public void createTable(
153154
@Override
154155
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
155156
throws TableNotExistException {}
157+
158+
@Override
159+
public Optional<LakeSnapshotInfo> getLatestSnapshotInfo(
160+
TablePath tablePath, Context context) {
161+
return Optional.empty();
162+
}
156163
}
157164
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.apache.fluss.exception.TableAlreadyExistException;
2222
import org.apache.fluss.exception.TableNotExistException;
2323
import org.apache.fluss.lake.lakestorage.LakeCatalog;
24+
import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo;
2425
import org.apache.fluss.metadata.TableChange;
2526
import org.apache.fluss.metadata.TableDescriptor;
2627
import org.apache.fluss.metadata.TablePath;
2728

2829
import java.util.List;
30+
import java.util.Optional;
2931

3032
/** Implementation of {@link LakeCatalog} for values lake. */
3133
public class TestingValuesLakeCatalog implements LakeCatalog {
@@ -40,4 +42,9 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
4042
throws TableNotExistException {
4143
throw new RuntimeException("Not impl.");
4244
}
45+
46+
@Override
47+
public Optional<LakeSnapshotInfo> getLatestSnapshotInfo(TablePath tablePath, Context context) {
48+
return Optional.empty();
49+
}
4350
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.exception.TableNotExistException;
2525
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
2626
import org.apache.fluss.lake.lakestorage.LakeCatalog;
27+
import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo;
2728
import org.apache.fluss.metadata.TableChange;
2829
import org.apache.fluss.metadata.TableDescriptor;
2930
import org.apache.fluss.metadata.TablePath;
@@ -52,6 +53,7 @@
5253
import java.util.LinkedHashMap;
5354
import java.util.List;
5455
import java.util.Map;
56+
import java.util.Optional;
5557
import java.util.Set;
5658

5759
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -162,6 +164,11 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
162164
}
163165
}
164166

167+
@Override
168+
public Optional<LakeSnapshotInfo> getLatestSnapshotInfo(TablePath tablePath, Context context) {
169+
return Optional.empty();
170+
}
171+
165172
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
166173
return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
167174
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import org.apache.fluss.exception.InvalidAlterTableException;
2323
import org.apache.fluss.exception.TableAlreadyExistException;
2424
import org.apache.fluss.exception.TableNotExistException;
25+
import org.apache.fluss.lake.committer.LakeCommitter;
2526
import org.apache.fluss.lake.lakestorage.LakeCatalog;
27+
import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo;
2628
import org.apache.fluss.metadata.TableChange;
2729
import org.apache.fluss.metadata.TableDescriptor;
2830
import org.apache.fluss.metadata.TablePath;
2931
import org.apache.fluss.utils.IOUtils;
3032

33+
import org.apache.paimon.Snapshot;
3134
import org.apache.paimon.catalog.Catalog;
3235
import org.apache.paimon.catalog.CatalogContext;
3336
import org.apache.paimon.catalog.CatalogFactory;
@@ -44,6 +47,7 @@
4447

4548
import java.util.LinkedHashMap;
4649
import java.util.List;
50+
import java.util.Optional;
4751

4852
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4953
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
@@ -133,6 +137,36 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
133137
}
134138
}
135139

140+
@Override
141+
public Optional<LakeSnapshotInfo> getLatestSnapshotInfo(TablePath tablePath, Context context) {
142+
Identifier identifier =
143+
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
144+
Table paimonTable;
145+
try {
146+
paimonTable = paimonCatalog.getTable(identifier);
147+
} catch (Catalog.TableNotExistException e) {
148+
return Optional.empty();
149+
}
150+
151+
FileStoreTable fileStoreTable = (FileStoreTable) paimonTable;
152+
Snapshot snapshot = fileStoreTable.snapshotManager().latestSnapshot();
153+
if (snapshot == null) {
154+
return Optional.empty();
155+
}
156+
157+
String flussOffsets =
158+
Optional.ofNullable(snapshot.properties())
159+
.map(
160+
props ->
161+
props.get(
162+
LakeCommitter
163+
.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY))
164+
.orElse(null);
165+
166+
return Optional.of(
167+
new LakeSnapshotInfo(snapshot.id(), snapshot.timeMillis(), flussOffsets));
168+
}
169+
136170
private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
137171
throws TableNotExistException {
138172
try {

0 commit comments

Comments
 (0)