-
Notifications
You must be signed in to change notification settings - Fork 426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RATIS-2245. Ratis should wait for all apply transaction futures before taking snapshot and group remove #1218
base: master
Are you sure you want to change the base?
Changes from 2 commits
858aa1c
ef35f59
7626c79
e25ba08
e76749e
b635b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,6 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
|
@@ -182,6 +180,7 @@ public String toString() { | |
|
||
@Override | ||
public void run() { | ||
CompletableFuture<Void> applyLogFutures = CompletableFuture.completedFuture(null); | ||
for(; state != State.STOP; ) { | ||
try { | ||
waitForCommit(); | ||
|
@@ -190,11 +189,10 @@ public void run() { | |
reload(); | ||
} | ||
|
||
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog(); | ||
checkAndTakeSnapshot(futures); | ||
applyLogFutures = applyLog(applyLogFutures); | ||
checkAndTakeSnapshot(applyLogFutures); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In one of case where no snpashot to be taken and no stop(), future.get() will not be called. Do this will fix the issue as expected for applyTransaction to finish ? I think no, it should wait for all case. And related to performance impact if always need wait, do this call is always in stop() flow or all flow ? this part is not clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as discussed, this is not a problem. |
||
|
||
if (shouldStop()) { | ||
checkAndTakeSnapshot(futures); | ||
swamirishi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
stop(); | ||
} | ||
} catch (Throwable t) { | ||
|
@@ -239,8 +237,7 @@ private void reload() throws IOException { | |
state = State.RUNNING; | ||
} | ||
|
||
private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException { | ||
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = MemoizedSupplier.valueOf(ArrayList::new); | ||
private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException { | ||
final long committed = raftLog.getLastCommittedIndex(); | ||
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) { | ||
final long nextIndex = applied + 1; | ||
|
@@ -263,7 +260,7 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf | |
final long incremented = appliedIndex.incrementAndGet(debugIndexChange); | ||
Preconditions.assertTrue(incremented == nextIndex); | ||
if (f != null) { | ||
futures.get().add(f); | ||
applyLogFutures = applyLogFutures.thenCombine(f, (previous, current) -> previous); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current is a Message class returned and previous is null value Void There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need the Message returned after apply transactionLog There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. Then, let's return null, i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @swamirishi , any update for this? |
||
f.thenAccept(m -> notifyAppliedIndex(incremented)); | ||
} else { | ||
notifyAppliedIndex(incremented); | ||
|
@@ -272,17 +269,14 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf | |
next.release(); | ||
} | ||
} | ||
return futures; | ||
return applyLogFutures; | ||
} | ||
|
||
private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures) | ||
private void checkAndTakeSnapshot(CompletableFuture<?> futures) | ||
throws ExecutionException, InterruptedException { | ||
// check if need to trigger a snapshot | ||
if (shouldTakeSnapshot()) { | ||
if (futures.isInitialized()) { | ||
JavaUtils.allOf(futures.get()).get(); | ||
} | ||
|
||
futures.get(); | ||
takeSnapshot(); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if
applyLogFutures.isCompletedExceptionally()
before applying logs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in this case we have already submitted next set of tasks right? What would be the point of checking the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point is to fail fast. Otherwise, it will keep applying log even if the state machine has failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The statemachine can take a call right if it has failed. From what I understand statemachine should receive all the transactions, and it is on the statemachine to have the guardrail if it should apply a transaction or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to fail fast we should fail the group remove as well.