Skip to content

Commit

Permalink
RATIS-2245. Ratis should wait for all apply transaction futures befor…
Browse files Browse the repository at this point in the history
…e group remove
  • Loading branch information
swamirishi committed Feb 7, 2025
1 parent 2664ac8 commit ed7c066
Showing 1 changed file with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public String toString() {

@Override
public void run() {
CompletableFuture<Void> applyLogFutures = CompletableFuture.completedFuture(null);
for(; state != State.STOP; ) {
try {
waitForCommit();
Expand All @@ -190,11 +191,11 @@ public void run() {
reload();
}

final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog();
checkAndTakeSnapshot(futures);
applyLogFutures = applyLog(applyLogFutures);
checkAndTakeSnapshot(applyLogFutures);

if (shouldStop()) {
checkAndTakeSnapshot(futures);
checkAndTakeSnapshot(applyLogFutures);
stop();
}
} catch (Throwable t) {
Expand Down Expand Up @@ -239,8 +240,7 @@ private void reload() throws IOException {
state = State.RUNNING;
}

private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException {
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = MemoizedSupplier.valueOf(ArrayList::new);
private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException {
final long committed = raftLog.getLastCommittedIndex();
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) {
final long nextIndex = applied + 1;
Expand All @@ -263,7 +263,7 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
Preconditions.assertTrue(incremented == nextIndex);
if (f != null) {
futures.get().add(f);
applyLogFutures = applyLogFutures.thenCombine(f, (previous, current) -> previous);
f.thenAccept(m -> notifyAppliedIndex(incremented));
} else {
notifyAppliedIndex(incremented);
Expand All @@ -272,17 +272,14 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf
next.release();
}
}
return futures;
return applyLogFutures;
}

private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures)
private void checkAndTakeSnapshot(CompletableFuture<?> futures)
throws ExecutionException, InterruptedException {
// check if need to trigger a snapshot
if (shouldTakeSnapshot()) {
if (futures.isInitialized()) {
JavaUtils.allOf(futures.get()).get();
}

futures.get();
takeSnapshot();
}
}
Expand Down

0 comments on commit ed7c066

Please sign in to comment.