Skip to content

Commit 858aa1c

Browse files
committed
RATIS-2245. Ratis should wait for all apply transaction futures before taking snapshot and group remove
1 parent 2664ac8 commit 858aa1c

1 file changed

Lines changed: 9 additions & 14 deletions

File tree

ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.slf4j.LoggerFactory;
3838

3939
import java.io.IOException;
40-
import java.util.ArrayList;
41-
import java.util.List;
4240
import java.util.Objects;
4341
import java.util.Optional;
4442
import java.util.concurrent.CompletableFuture;
@@ -182,6 +180,7 @@ public String toString() {
182180

183181
@Override
184182
public void run() {
183+
CompletableFuture<Void> applyLogFutures = CompletableFuture.completedFuture(null);
185184
for(; state != State.STOP; ) {
186185
try {
187186
waitForCommit();
@@ -190,11 +189,11 @@ public void run() {
190189
reload();
191190
}
192191

193-
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog();
194-
checkAndTakeSnapshot(futures);
192+
applyLogFutures = applyLog(applyLogFutures);
193+
checkAndTakeSnapshot(applyLogFutures);
195194

196195
if (shouldStop()) {
197-
checkAndTakeSnapshot(futures);
196+
checkAndTakeSnapshot(applyLogFutures);
198197
stop();
199198
}
200199
} catch (Throwable t) {
@@ -239,8 +238,7 @@ private void reload() throws IOException {
239238
state = State.RUNNING;
240239
}
241240

242-
private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException {
243-
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = MemoizedSupplier.valueOf(ArrayList::new);
241+
private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException {
244242
final long committed = raftLog.getLastCommittedIndex();
245243
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) {
246244
final long nextIndex = applied + 1;
@@ -263,7 +261,7 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
263261
final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
264262
Preconditions.assertTrue(incremented == nextIndex);
265263
if (f != null) {
266-
futures.get().add(f);
264+
applyLogFutures = applyLogFutures.thenCombine(f, (previous, current) -> previous);
267265
f.thenAccept(m -> notifyAppliedIndex(incremented));
268266
} else {
269267
notifyAppliedIndex(incremented);
@@ -272,17 +270,14 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
272270
next.release();
273271
}
274272
}
275-
return futures;
273+
return applyLogFutures;
276274
}
277275

278-
private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures)
276+
private void checkAndTakeSnapshot(CompletableFuture<?> futures)
279277
throws ExecutionException, InterruptedException {
280278
// check if need to trigger a snapshot
281279
if (shouldTakeSnapshot()) {
282-
if (futures.isInitialized()) {
283-
JavaUtils.allOf(futures.get()).get();
284-
}
285-
280+
futures.get();
286281
takeSnapshot();
287282
}
288283
}

0 commit comments

Comments
 (0)