-
Notifications
You must be signed in to change notification settings - Fork 426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RATIS-2245. Ratis should wait for all apply transaction futures before taking snapshot and group remove #1218
base: master
Are you sure you want to change the base?
Changes from 4 commits
858aa1c
ef35f59
7626c79
e25ba08
e76749e
b635b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,6 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
|
@@ -182,19 +180,20 @@ public String toString() { | |
|
||
@Override | ||
public void run() { | ||
CompletableFuture<Void> applyLogFutures = CompletableFuture.completedFuture(null); | ||
for(; state != State.STOP; ) { | ||
try { | ||
waitForCommit(); | ||
waitForCommit(applyLogFutures); | ||
|
||
if (state == State.RELOAD) { | ||
reload(); | ||
} | ||
|
||
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog(); | ||
checkAndTakeSnapshot(futures); | ||
applyLogFutures = applyLog(applyLogFutures); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's check if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But in this case we have already submitted next set of tasks right? What would be the point of checking the exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point is to fail fast. Otherwise, it will keep applying log even if the state machine has failed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The statemachine can take a call right if it has failed. From what I understand statemachine should receive all the transactions, and it is on the statemachine to have the guardrail if it should apply a transaction or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to fail fast we should fail the group remove as well. |
||
checkAndTakeSnapshot(applyLogFutures); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In one of case where no snpashot to be taken and no stop(), future.get() will not be called. Do this will fix the issue as expected for applyTransaction to finish ? I think no, it should wait for all case. And related to performance impact if always need wait, do this call is always in stop() flow or all flow ? this part is not clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as discussed, this is not a problem. |
||
|
||
if (shouldStop()) { | ||
checkAndTakeSnapshot(futures); | ||
swamirishi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
applyLogFutures.get(); | ||
stop(); | ||
} | ||
} catch (Throwable t) { | ||
|
@@ -210,14 +209,14 @@ public void run() { | |
} | ||
} | ||
|
||
private void waitForCommit() throws InterruptedException { | ||
private void waitForCommit(CompletableFuture<Void> applyLogFutures) throws InterruptedException, ExecutionException { | ||
// When a peer starts, the committed is initialized to 0. | ||
// It will be updated only after the leader contacts other peers. | ||
// Thus it is possible to have applied > committed initially. | ||
final long applied = getLastAppliedIndex(); | ||
for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING && !shouldStop(); ) { | ||
if (server.getSnapshotRequestHandler().shouldTriggerTakingSnapshot()) { | ||
takeSnapshot(); | ||
takeSnapshot(applyLogFutures); | ||
} | ||
if (awaitForSignal.await(100, TimeUnit.MILLISECONDS)) { | ||
return; | ||
|
@@ -239,8 +238,7 @@ private void reload() throws IOException { | |
state = State.RUNNING; | ||
} | ||
|
||
private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws RaftLogIOException { | ||
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = MemoizedSupplier.valueOf(ArrayList::new); | ||
private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures) throws RaftLogIOException { | ||
final long committed = raftLog.getLastCommittedIndex(); | ||
for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) { | ||
final long nextIndex = applied + 1; | ||
|
@@ -263,7 +261,7 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf | |
final long incremented = appliedIndex.incrementAndGet(debugIndexChange); | ||
Preconditions.assertTrue(incremented == nextIndex); | ||
if (f != null) { | ||
futures.get().add(f); | ||
applyLogFutures = applyLogFutures.thenCombine(f, (v, message) -> null); | ||
f.thenAccept(m -> notifyAppliedIndex(incremented)); | ||
} else { | ||
notifyAppliedIndex(incremented); | ||
|
@@ -272,23 +270,20 @@ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws Raf | |
next.release(); | ||
} | ||
} | ||
return futures; | ||
return applyLogFutures; | ||
} | ||
|
||
private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures) | ||
private void checkAndTakeSnapshot(CompletableFuture<?> futures) | ||
throws ExecutionException, InterruptedException { | ||
// check if need to trigger a snapshot | ||
if (shouldTakeSnapshot()) { | ||
if (futures.isInitialized()) { | ||
JavaUtils.allOf(futures.get()).get(); | ||
} | ||
|
||
takeSnapshot(); | ||
takeSnapshot(futures); | ||
} | ||
} | ||
|
||
private void takeSnapshot() { | ||
private void takeSnapshot(CompletableFuture<?> applyLogFutures) throws ExecutionException, InterruptedException { | ||
final long i; | ||
applyLogFutures.get(); | ||
try { | ||
try(UncheckedAutoCloseable ignored = Timekeeper.start(stateMachineMetrics.get().getTakeSnapshotTimer())) { | ||
i = stateMachine.takeSnapshot(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand changes and impact,
Q1: Step 3 changes checkAndTakeSnapshot() moving out of stop() and force checking to take snapshot do have performance impact?
Q2: Let No snapshot to be taken, in this case, future.get() is never called, do this is intended ?
Q3: We can refactor code as above behavior, that,
This have similar impact instead of passing future deep inside takeSnapshot().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ratis/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
Line 196 in e25ba08
Future.get() will be called inconsequential of whether the snapshot is taken or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do a future.get() just before taking a snapshot. We cannot be having transactions still being applied in the background which could cause some inconsistency. It makes more sense to wait just before taking a snapshot that is why I moved the future.get() inside takeSnapshot method
ratis/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
Line 286 in e25ba08