Skip to content

Commit c04ecc2

Browse files
author
Liebing
committed
wip
1 parent de648d2 commit c04ecc2

39 files changed

Lines changed: 544 additions & 267 deletions

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,10 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
405405
// Remote Log Paths
406406
// ----------------------------------------------------------------------------------------
407407

408+
public static FsPath remoteDataDir(Configuration conf) {
409+
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
410+
}
411+
408412
/**
409413
* Returns the remote root directory path for storing log files.
410414
*
@@ -584,6 +588,10 @@ public static FsPath remoteKvDir(Configuration conf) {
584588
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_KV_DIR_NAME);
585589
}
586590

591+
public static FsPath remoteKvDir(FsPath remoteDataDir) {
592+
return new FsPath(remoteDataDir, REMOTE_KV_DIR_NAME);
593+
}
594+
587595
/**
588596
* Returns the remote directory path for storing kv snapshot files for a kv tablet.
589597
*

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.fluss.server.coordinator;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.fs.FsPath;
22+
import org.apache.fluss.metadata.PhysicalTablePath;
2123
import org.apache.fluss.metadata.TableBucket;
22-
import org.apache.fluss.metadata.TablePath;
2324
import org.apache.fluss.metrics.MetricNames;
2425
import org.apache.fluss.metrics.groups.MetricGroup;
2526
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -30,6 +31,7 @@
3031
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
3132
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
3233
import org.apache.fluss.server.zk.ZooKeeperClient;
34+
import org.apache.fluss.utils.FlussPaths;
3335
import org.apache.fluss.utils.MapUtils;
3436

3537
import org.slf4j.Logger;
@@ -66,17 +68,21 @@ public class CompletedSnapshotStoreManager {
6668
makeZookeeperCompletedSnapshotHandleStore;
6769
private final CoordinatorMetricGroup coordinatorMetricGroup;
6870

71+
private final FsPath remoteKvDir;
72+
6973
public CompletedSnapshotStoreManager(
7074
int maxNumberOfSnapshotsToRetain,
7175
Executor ioExecutor,
7276
ZooKeeperClient zooKeeperClient,
73-
CoordinatorMetricGroup coordinatorMetricGroup) {
77+
CoordinatorMetricGroup coordinatorMetricGroup,
78+
FsPath remoteKvDir) {
7479
this(
7580
maxNumberOfSnapshotsToRetain,
7681
ioExecutor,
7782
zooKeeperClient,
7883
ZooKeeperCompletedSnapshotHandleStore::new,
79-
coordinatorMetricGroup);
84+
coordinatorMetricGroup,
85+
remoteKvDir);
8086
}
8187

8288
@VisibleForTesting
@@ -86,7 +92,8 @@ public CompletedSnapshotStoreManager(
8692
ZooKeeperClient zooKeeperClient,
8793
Function<ZooKeeperClient, CompletedSnapshotHandleStore>
8894
makeZookeeperCompletedSnapshotHandleStore,
89-
CoordinatorMetricGroup coordinatorMetricGroup) {
95+
CoordinatorMetricGroup coordinatorMetricGroup,
96+
FsPath remoteKvDir) {
9097
checkArgument(
9198
maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive");
9299
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
@@ -95,6 +102,7 @@ public CompletedSnapshotStoreManager(
95102
this.ioExecutor = ioExecutor;
96103
this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore;
97104
this.coordinatorMetricGroup = coordinatorMetricGroup;
105+
this.remoteKvDir = remoteKvDir;
98106

99107
registerMetrics();
100108
}
@@ -121,15 +129,15 @@ private long getAllSnapshotSize(TableBucket tableBucket) {
121129
}
122130

123131
public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
124-
TablePath tablePath, TableBucket tableBucket) {
132+
PhysicalTablePath tablePath, TableBucket tableBucket) {
125133
return bucketCompletedSnapshotStores.computeIfAbsent(
126134
tableBucket,
127135
(bucket) -> {
128136
try {
129137
LOG.info("Creating snapshot store for table bucket {}.", bucket);
130138
long start = System.currentTimeMillis();
131139
CompletedSnapshotStore snapshotStore =
132-
createCompletedSnapshotStore(tableBucket, ioExecutor);
140+
createCompletedSnapshotStore(tablePath, tableBucket, ioExecutor);
133141
long end = System.currentTimeMillis();
134142
LOG.info(
135143
"Created snapshot store for table bucket {} in {} ms.",
@@ -138,7 +146,7 @@ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
138146

139147
MetricGroup bucketMetricGroup =
140148
coordinatorMetricGroup.getTableBucketMetricGroup(
141-
tablePath, tableBucket);
149+
tablePath.getTablePath(), tableBucket);
142150
if (bucketMetricGroup != null) {
143151
LOG.info("Add bucketMetricGroup for tableBucket {}.", bucket);
144152
bucketMetricGroup.gauge(
@@ -168,7 +176,8 @@ public void removeCompletedSnapshotStoreByTableBuckets(Set<TableBucket> tableBuc
168176
}
169177

170178
private CompletedSnapshotStore createCompletedSnapshotStore(
171-
TableBucket tableBucket, Executor ioExecutor) throws Exception {
179+
PhysicalTablePath tablePath, TableBucket tableBucket, Executor ioExecutor)
180+
throws Exception {
172181
final CompletedSnapshotHandleStore completedSnapshotHandleStore =
173182
this.makeZookeeperCompletedSnapshotHandleStore.apply(zooKeeperClient);
174183

@@ -179,14 +188,18 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
179188
final int numberOfInitialSnapshots = initialSnapshots.size();
180189

181190
LOG.info(
182-
"Found {} snapshots in {}.",
191+
"Found {} snapshots in {} of table bucket {}.",
183192
numberOfInitialSnapshots,
184-
completedSnapshotHandleStore.getClass().getSimpleName());
193+
completedSnapshotHandleStore.getClass().getSimpleName(),
194+
tableBucket);
185195

186196
final List<CompletedSnapshot> retrievedSnapshots =
187197
new ArrayList<>(numberOfInitialSnapshots);
188198

189-
LOG.info("Trying to fetch {} snapshots from storage.", numberOfInitialSnapshots);
199+
LOG.info(
200+
"Trying to fetch {} snapshots from storage of table bucket {}.",
201+
numberOfInitialSnapshots,
202+
tableBucket);
190203

191204
for (CompletedSnapshotHandle snapshotStateHandle : initialSnapshots) {
192205
try {
@@ -205,28 +218,36 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
205218
tableBucket, snapshotStateHandle.getSnapshotId());
206219
} catch (Exception t) {
207220
LOG.error(
208-
"Failed to remove snapshotStateHandle {}.", snapshotStateHandle, t);
221+
"Failed to remove snapshotStateHandle {} of table bucket {}.",
222+
snapshotStateHandle,
223+
tableBucket,
224+
t);
209225
throw t;
210226
}
211227
} else {
212228
LOG.error(
213-
"Failed to retrieveCompleteSnapshot for snapshotStateHandle {}.",
229+
"Failed to retrieveCompleteSnapshot for snapshotStateHandle {} of table bucket {}.",
214230
snapshotStateHandle,
231+
tableBucket,
215232
e);
216233
throw e;
217234
}
218235
}
219236
}
220237

238+
FsPath remoteKvTabletDir =
239+
FlussPaths.remoteKvTabletDir(remoteKvDir, tablePath, tableBucket);
221240
// register all the files to shared kv file registry
222-
SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(ioExecutor);
241+
SharedKvFileRegistry sharedKvFileRegistry =
242+
new SharedKvFileRegistry(remoteKvTabletDir, ioExecutor);
223243
for (CompletedSnapshot completedSnapshot : retrievedSnapshots) {
224244
try {
225245
sharedKvFileRegistry.registerAllAfterRestored(completedSnapshot);
226246
} catch (Exception e) {
227247
LOG.error(
228-
"Failed to registerAllAfterRestored for completedSnapshot {}.",
248+
"Failed to registerAllAfterRestored for completedSnapshot {} of table bucket {}.",
229249
completedSnapshot,
250+
tableBucket,
230251
e);
231252
throw e;
232253
}
@@ -237,7 +258,8 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
237258
sharedKvFileRegistry,
238259
retrievedSnapshots,
239260
completedSnapshotHandleStore,
240-
ioExecutor);
261+
ioExecutor,
262+
remoteKvTabletDir);
241263
}
242264

243265
@VisibleForTesting

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.exception.IneligibleReplicaException;
3333
import org.apache.fluss.exception.InvalidCoordinatorException;
3434
import org.apache.fluss.exception.InvalidUpdateVersionException;
35+
import org.apache.fluss.exception.PartitionNotExistException;
3536
import org.apache.fluss.exception.RebalanceFailureException;
3637
import org.apache.fluss.exception.ServerNotExistException;
3738
import org.apache.fluss.exception.ServerTagAlreadyExistException;
@@ -118,6 +119,7 @@
118119
import org.apache.fluss.server.zk.data.lake.LakeTable;
119120
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
120121
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
122+
import org.apache.fluss.utils.FlussPaths;
121123
import org.apache.fluss.utils.types.Tuple2;
122124

123125
import org.slf4j.Logger;
@@ -234,7 +236,8 @@ public CoordinatorEventProcessor(
234236
conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
235237
ioExecutor,
236238
zooKeeperClient,
237-
coordinatorMetricGroup);
239+
coordinatorMetricGroup,
240+
FlussPaths.remoteKvDir(conf));
238241
this.autoPartitionManager = autoPartitionManager;
239242
this.lakeTableTieringManager = lakeTableTieringManager;
240243
this.coordinatorMetricGroup = coordinatorMetricGroup;
@@ -1738,9 +1741,29 @@ private void tryProcessCommitKvSnapshot(
17381741
callback.completeExceptionally(e);
17391742
return;
17401743
}
1741-
// commit the kv snapshot asynchronously
1744+
17421745
TableBucket tb = event.getTableBucket();
1743-
TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId());
1746+
1747+
// get physical table path
1748+
PhysicalTablePath physicalTablePath;
1749+
if (tb.getPartitionId() == null) {
1750+
TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId());
1751+
physicalTablePath = PhysicalTablePath.of(tablePath);
1752+
} else {
1753+
Optional<PhysicalTablePath> physicalTablePathOp =
1754+
coordinatorContext.getPhysicalTablePath(tb.getPartitionId());
1755+
if (!physicalTablePathOp.isPresent()) {
1756+
callback.completeExceptionally(
1757+
new PartitionNotExistException(
1758+
"PhysicalTablePath not found for partition "
1759+
+ tb.getPartitionId()));
1760+
return;
1761+
} else {
1762+
physicalTablePath = physicalTablePathOp.get();
1763+
}
1764+
}
1765+
1766+
// commit the kv snapshot asynchronously
17441767
ioExecutor.execute(
17451768
() -> {
17461769
try {
@@ -1749,7 +1772,7 @@ private void tryProcessCommitKvSnapshot(
17491772
// add completed snapshot
17501773
CompletedSnapshotStore completedSnapshotStore =
17511774
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
1752-
tablePath, tb);
1775+
physicalTablePath, tb);
17531776
// this involves IO operation (ZK), so we do it in ioExecutor
17541777
completedSnapshotStore.add(completedSnapshot);
17551778
coordinatorEventManager.put(

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.fs.FileSystem;
2222
import org.apache.fluss.fs.FsPath;
2323
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.utils.FlussPaths;
2425
import org.apache.fluss.utils.concurrent.FutureUtils;
2526

2627
import javax.annotation.concurrent.NotThreadSafe;
@@ -68,31 +69,22 @@ public class CompletedSnapshot {
6869
/** The next log offset when the snapshot is triggered. */
6970
private final long logOffset;
7071

71-
/** The location where the snapshot is stored. */
72-
private final FsPath snapshotLocation;
73-
7472
public static final String SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE = "No such file or directory";
7573

7674
public CompletedSnapshot(
7775
TableBucket tableBucket,
7876
long snapshotID,
79-
FsPath snapshotLocation,
8077
KvSnapshotHandle kvSnapshotHandle,
8178
long logOffset) {
8279
this.tableBucket = tableBucket;
8380
this.snapshotID = snapshotID;
84-
this.snapshotLocation = snapshotLocation;
8581
this.kvSnapshotHandle = kvSnapshotHandle;
8682
this.logOffset = logOffset;
8783
}
8884

8985
@VisibleForTesting
90-
CompletedSnapshot(
91-
TableBucket tableBucket,
92-
long snapshotID,
93-
FsPath snapshotLocation,
94-
KvSnapshotHandle kvSnapshotHandle) {
95-
this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0);
86+
CompletedSnapshot(TableBucket tableBucket, long snapshotID, KvSnapshotHandle kvSnapshotHandle) {
87+
this(tableBucket, snapshotID, kvSnapshotHandle, 0);
9688
}
9789

9890
public long getSnapshotID() {
@@ -125,54 +117,43 @@ public void registerSharedKvFilesAfterRestored(SharedKvFileRegistry sharedKvFile
125117
sharedKvFileRegistry.registerAllAfterRestored(this);
126118
}
127119

128-
public CompletableFuture<Void> discardAsync(Executor ioExecutor) {
120+
public CompletableFuture<Void> discardAsync(FsPath remoteKvTabletDir, Executor ioExecutor) {
129121
// it'll discard the snapshot files for kv, it'll always discard
130122
// the private files; for shared files, only if they're not be registered in
131123
// SharedKvRegistry, can the files be deleted.
132124
CompletableFuture<Void> discardKvFuture =
133-
FutureUtils.runAsync(kvSnapshotHandle::discard, ioExecutor);
125+
FutureUtils.runAsync(
126+
() -> kvSnapshotHandle.discard(remoteKvTabletDir, snapshotID), ioExecutor);
134127

128+
FsPath snapshotLocation = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotID);
135129
CompletableFuture<Void> discardMetaFileFuture =
136-
FutureUtils.runAsync(this::disposeMetadata, ioExecutor);
130+
FutureUtils.runAsync(() -> disposeMetadata(snapshotLocation), ioExecutor);
137131

138132
return FutureUtils.runAfterwards(
139133
FutureUtils.completeAll(Arrays.asList(discardKvFuture, discardMetaFileFuture)),
140-
this::disposeSnapshotStorage);
134+
() -> disposeSnapshotStorage(snapshotLocation));
141135
}
142136

143-
private void disposeSnapshotStorage() throws IOException {
137+
private void disposeSnapshotStorage(FsPath snapshotLocation) throws IOException {
144138
if (snapshotLocation != null) {
145139
FileSystem fileSystem = snapshotLocation.getFileSystem();
146140
fileSystem.delete(snapshotLocation, false);
147141
}
148142
}
149143

150-
/**
151-
* Return the metadata file path that stores all the information that describes the snapshot.
152-
*/
153-
public FsPath getMetadataFilePath() {
154-
return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME);
155-
}
156-
157144
public static FsPath getMetadataFilePath(FsPath snapshotLocation) {
158145
return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME);
159146
}
160147

161-
private void disposeMetadata() throws IOException {
162-
FsPath metadataFilePath = getMetadataFilePath();
148+
private void disposeMetadata(FsPath snapshotLocation) throws IOException {
149+
FsPath metadataFilePath = getMetadataFilePath(snapshotLocation);
163150
FileSystem fileSystem = metadataFilePath.getFileSystem();
164151
fileSystem.delete(metadataFilePath, false);
165152
}
166153

167-
public FsPath getSnapshotLocation() {
168-
return snapshotLocation;
169-
}
170-
171154
@Override
172155
public String toString() {
173-
return String.format(
174-
"CompletedSnapshot %d for %s located at %s",
175-
snapshotID, tableBucket, snapshotLocation);
156+
return String.format("CompletedSnapshot %d for %s", snapshotID, tableBucket);
176157
}
177158

178159
@Override
@@ -187,12 +168,11 @@ public boolean equals(Object o) {
187168
return snapshotID == that.snapshotID
188169
&& logOffset == that.logOffset
189170
&& Objects.equals(tableBucket, that.tableBucket)
190-
&& Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle)
191-
&& Objects.equals(snapshotLocation, that.snapshotLocation);
171+
&& Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle);
192172
}
193173

194174
@Override
195175
public int hashCode() {
196-
return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset, snapshotLocation);
176+
return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset);
197177
}
198178
}

0 commit comments

Comments
 (0)