Skip to content

Commit

Permalink
Fix view change during election procedure
Browse files Browse the repository at this point in the history
* Added the Checkpoint utility to help during the tests.
* Created the test reproducing the issue where the leader leaves before
  the coordinator applies the election locally. A liveness issue.
* Draft a solution by synchronizing and check whether the elected leader
  is still in the view before stopping the voting thread.

Election mechanism and tests

* Handle view change after collecting votes;
* Ensure the voting thread keeps running until the leader is elected or
  thread interrupted;
* Handle view change before delivering the LeaderElected message.
  • Loading branch information
jabolina committed Aug 11, 2024
1 parent 2af0166 commit 0acfef5
Show file tree
Hide file tree
Showing 16 changed files with 1,037 additions and 45 deletions.
12 changes: 6 additions & 6 deletions src/org/jgroups/protocols/raft/ELECTION.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public class ELECTION extends BaseElection {

@Override
protected void handleView(View v) {
View view = this.view; this.view = v; // assign view before check on the election result
Majority result=Utils.computeMajority(view, v, raft().majority(), raft.leader());
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, view, v, result);
List<Address> joiners=View.newMembers(view, v);
View previousView = this.view;
this.view = v;
Majority result=Utils.computeMajority(previousView, v, raft().majority(), raft.leader());
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, previousView, v, result);
List<Address> joiners=View.newMembers(previousView, v);
boolean has_new_members=joiners != null && !joiners.isEmpty();
boolean coordinatorChanged = Utils.viewCoordinatorChanged(view, v);
boolean coordinatorChanged = Utils.viewCoordinatorChanged(previousView, v);
switch(result) {
case no_change:
// the leader resends its term/address for new members to set the term/leader.
Expand All @@ -72,7 +73,6 @@ else if (coordinatorChanged && isViewCoordinator() && isMajorityAvailable() && r
// See: https://github.com/jgroups-extras/jgroups-raft/issues/259
if(isViewCoordinator()) {
log.trace("%s: starting voting process (reason: %s, view: %s)", local_addr, result, view);
stopVotingThread();
startVotingThread();
}
break;
Expand Down
11 changes: 5 additions & 6 deletions src/org/jgroups/protocols/raft/ELECTION2.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,12 @@ public boolean isPreVoteThreadRunning() {

@Override
protected void handleView(View v) {
Majority result = Utils.computeMajority(view, v, raft().majority(), raft.leader());
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, this.view, v, result);

View old_view = this.view;
View previousView = this.view;
this.view = v;
Majority result = Utils.computeMajority(previousView, v, raft().majority(), raft.leader());
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, previousView, v, result);

List<Address> joiners = View.newMembers(old_view, v);
List<Address> joiners = View.newMembers(previousView, v);
boolean has_new_members = joiners != null && !joiners.isEmpty();

switch (result) {
Expand All @@ -86,7 +85,7 @@ protected void handleView(View v) {
}
// If we have no change in terms of majority threshold. If the view coordinator changed, we need to
// verify if an election is necessary.
if (Utils.viewCoordinatorChanged(old_view, v) && isViewCoordinator() && view.size() >= raft.majority()) {
if (Utils.viewCoordinatorChanged(previousView, v) && isViewCoordinator() && view.size() >= raft.majority()) {
preVotingMechanism.start();
}
break;
Expand Down
68 changes: 52 additions & 16 deletions src/org/jgroups/protocols/raft/election/BaseElection.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,20 @@ public void resetStats() {
num_voting_rounds=0;
}

@Override
public void init() throws Exception {
super.init();
raft=RAFT.findProtocol(RAFT.class, this, false);
}

@Override
public void stop() {
stopVotingThread();
if (raft != null)
raft.setLeaderAndTerm(null);
}

@Override
public Object down(Event evt) {
switch(evt.getType()) {
case Event.DISCONNECT:
Expand Down Expand Up @@ -179,9 +182,17 @@ protected boolean isViewCoordinator() {
private void handleLeaderElected(Message msg, LeaderElected hdr) {
long term=hdr.currTerm();
Address leader=hdr.leader();
stopVotingThread(); // only on the coord
log.trace("%s <- %s: %s", local_addr, msg.src(), hdr);
raft.setLeaderAndTerm(leader, term); // possibly changes the role
View v = this.view;
// Only receive messages with null view when running tests with mock cluster.
// Otherwise, need to make sure the leader is the current view and there's still a majority.
// The view could change between the leader is decided and the message arrives.
if (v == null || (isLeaderInView(leader, v) && isMajorityAvailable(v, raft))) {
log.trace("%s <- %s: %s", local_addr, msg.src(), hdr);
stopVotingThread(); // only on the coord
raft.setLeaderAndTerm(leader, term); // possibly changes the role
} else {
log.trace("%s <- %s: %s after leader left (%s)", local_addr, msg.src(), hdr, v);
}
}

/**
Expand Down Expand Up @@ -288,18 +299,13 @@ private boolean isHigher(VoteResponse one, VoteResponse other) {
* The process keeps running until a leader is elected or the majority is lost.
*/
protected void runVotingProcess() {
// Before each run, verifies if the majority still in place.
// The view handling method also ensures to stop the voting thread when the majority is lost, this is an additional safeguard.
if (!isMajorityAvailable()) {
if (log.isDebugEnabled())
log.debug("%s: majority (%d) not available anymore (%s), stopping thread", local_addr, raft.majority(), view);

stopVotingThread();
return;
}
// If the thread is interrupted, means the voting thread was already stopped.
// We place this here just as a shortcut to not increase the term in RAFT.
if (Thread.interrupted()) return;

View electionView = this.view;
long new_term=raft.createNewTerm();
votes.reset(view.getMembersRaw());
votes.reset(electionView.getMembersRaw());
num_voting_rounds++;
long start=System.currentTimeMillis();
sendVoteRequest(new_term);
Expand All @@ -318,14 +324,44 @@ protected void runVotingProcess() {
// This should avoid any concurrent joiners. See: https://github.com/jgroups-extras/jgroups-raft/issues/253
raft.setLeaderAndTerm(leader, new_term);
sendLeaderElectedMessage(leader, new_term); // send to all - self
stopVotingThread();
}
else

// Hold intrinsic lock while verifying.
// If a view updates while this verification happens, it could lead to liveness issue where the
// voting thread does not continue to run, keeping a leader that left the cluster.
synchronized (this) {
// Check whether majority still in place between the collection of all votes to determining the leader.
// We must stop the voting thread and set the leader as null.
if (!isMajorityAvailable()) {
log.trace("%s: majority lost (%s) before elected (%s)", local_addr, view, leader);
stopVotingThread();
raft.setLeaderAndTerm(null);
return;
}

// At this point, the majority still in place, so we confirm the elected leader is still present in the view.
// If the leader is not in the view anymore, we keep the voting thread running.
if (isLeaderInView(leader, view)) {
stopVotingThread();
return;
}

if (log.isTraceEnabled())
log.trace("%s: leader (%s) not in view anymore, retrying", local_addr, leader);
}
} else if (log.isTraceEnabled())
log.trace("%s: collected votes from %s in %d ms (majority=%d); starting another voting round",
local_addr, votes.getValidResults(), time, majority);
}

private static boolean isLeaderInView(Address leader, View view) {
return view.containsMember(leader);
}

protected final boolean isMajorityAvailable() {
return isMajorityAvailable(view, raft);
}

private static boolean isMajorityAvailable(View view, RAFT raft) {
return view != null && view.size() >= raft.majority();
}

Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/raft/testfwk/MockRaftCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ public <T extends MockRaftCluster> T async(boolean b) {
* @return The {@link Executor} instance to utilize in the cluster abstraction.
*/
protected Executor createThreadPool(long max_idle_ms) {
int max_cores = Math.max(Runtime.getRuntime().availableProcessors(), 4);
return new ThreadPoolExecutor(0, max_cores, max_idle_ms, TimeUnit.MILLISECONDS,
// Same as Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, max_idle_ms, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
}

Expand Down
45 changes: 33 additions & 12 deletions src/org/jgroups/raft/testfwk/RaftCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.jgroups.Message;
import org.jgroups.View;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;

Expand All @@ -23,6 +25,9 @@ public class RaftCluster extends MockRaftCluster {
protected final Map<Address,RaftNode> nodes=new ConcurrentHashMap<>();
protected final Map<Address,RaftNode> dropped_members=new ConcurrentHashMap<>();

private final AtomicBoolean viewChanging = new AtomicBoolean(false);
private final BlockingQueue<Message> pending = new ArrayBlockingQueue<>(16);

@Override
public <T extends MockRaftCluster> T add(Address addr, RaftNode node) {
nodes.put(addr, node);
Expand All @@ -48,14 +53,20 @@ public <T extends MockRaftCluster> T clear() {

@Override
public void handleView(View view) {
List<Address> members=view.getMembers();
nodes.keySet().retainAll(Objects.requireNonNull(members));
nodes.values().forEach(n -> n.handleView(view));
viewChanging.set(true);
try {
List<Address> members=view.getMembers();
nodes.keySet().retainAll(Objects.requireNonNull(members));
nodes.values().forEach(n -> n.handleView(view));
} finally {
viewChanging.set(false);
sendPending();
}
}

@Override
public void send(Message msg) {
send(msg, false);
send(msg, async);
}

@Override
Expand All @@ -64,25 +75,28 @@ public int size() {
}

public void send(Message msg, boolean async) {
// Only emit messages after the new view is installed on all nodes.
if (viewChanging.get()) {
pending.add(msg);
return;
}

Address dest=msg.dest();
boolean block = interceptor != null && interceptor.shouldBlock(msg);

if(dest != null) {
// Retrieve the target before possibly blocking.
RaftNode node=nodes.get(dest);

// Blocks the invoking thread if cluster is synchronous.
if (block) {
interceptor.blockMessage(msg, async, () -> sendSingle(node, msg, async));
interceptor.blockMessage(msg, async, () -> sendSingle(nodes.get(dest), msg, async));
} else {
sendSingle(node, msg, async);
sendSingle(nodes.get(dest), msg, async);
}
} else {
// Blocks the invoking thread if cluster is synchronous.
if (block) {
// Copy the targets before possibly blocking the caller.
Set<Address> targets = new HashSet<>(nodes.keySet());
interceptor.blockMessage(msg, async, () -> sendMany(targets, msg, async));
// Copy the targets before possibly blocking the caller.;
interceptor.blockMessage(msg, async, () -> sendMany(nodes.keySet(), msg, async));
} else {
sendMany(nodes.keySet(), msg, async);
}
Expand Down Expand Up @@ -126,4 +140,11 @@ protected static void moveAll(Map<Address,RaftNode> from, Map<Address,RaftNode>
to.putIfAbsent(e.getKey(), e.getValue());
from.clear();
}

private void sendPending() {
Message msg;
while ((msg = pending.poll()) != null) {
send(msg);
}
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/raft/testfwk/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String toString() {
@SuppressWarnings("unchecked")
protected <T extends Protocol> T find(Class<T> cl) {
for(Protocol p: prots) {
if(p.getClass().isAssignableFrom(cl))
if(cl.isAssignableFrom(p.getClass()))
return (T)p;
}
return null;
Expand Down
50 changes: 50 additions & 0 deletions tests/junit-functional/org/jgroups/tests/ElectionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,51 @@ public void testElectionWithLongLogLast(Class<?> ignore) {
testLongestLog(Map.of(b, termsB, c, termsC), c.getAddress());
}

public void testElectionFollowersHigherTerm(Class<?> ignore) {
testHigherTerm(Map.of(b, 5, c, 5));
}

public void testElectionCoordinatorHigherTerm(Class<?> ignore) {
testHigherTerm(Map.of(a, 5));
}

private void testHigherTerm(Map<JChannel, Integer> terms) {
assertLeader(10_000, a.getAddress(), a, b, c);
long aTerm = raft(0).currentTerm();

for (Map.Entry<JChannel, Integer> entry : terms.entrySet()) {
setTerm(entry.getKey(), entry.getValue());
}

for (JChannel ch : terms.keySet()) {
assertThat(aTerm)
.withFailMessage(this::dumpLeaderAndTerms)
.isLessThan(raft(ch).currentTerm());
}

JChannel coord = findCoord(a, b, c);
assertThat(coord).isNotNull();

System.out.printf("\n\n-- starting the voting process on %s:\n", coord.getAddress());
BaseElection el = election(coord);

System.out.printf("-- current status: %n%s%n", dumpLeaderAndTerms());

// Wait the voting thread to stop.
// The last step is stopping the voting thread. This means, nodes might receive the leader message,
// but the voting thread didn't stopped yet.
// We can use a shorter timeout here.
waitUntilVotingThreadStops(1_500, 0, 1, 2);

// We start the election process. Eventually, the thread stops after collecting the necessary votes.
// Since we test with higher terms, it might take longer to the coordinator to catch up.
el.startVotingThread();
waitUntilVotingThreadStops(5_000, 0, 1, 2);

Address leader = assertLeader(10_000, null, a, b, c);
assertThat(leader).isEqualTo(coord.getAddress());
}

private void testLongestLog(Map<JChannel, int[]> logs, Address ... possibleElected) {
// Let node A be elected the first leader.
assertLeader(10_000, a.getAddress(), a, b, c);
Expand Down Expand Up @@ -137,6 +182,11 @@ protected void setLog(JChannel ch, int... terms) {
log.append(index + 1, le);
}

private void setTerm(JChannel ch, int term) {
RAFT raft = raft(ch);
raft.setLeaderAndTerm(null, term);
}

private long logLastAppended(JChannel ch) {
RAFT raft = raft(ch);
Log log = raft.log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public void testLeaderLeaving(Class<?> clazz) throws Exception {
// All nodes but the previous leader.
int[] indexes = IntStream.range(0, clusterSize).filter(i -> i != leader).toArray();
waitUntilLeaderElected(5_000, indexes);
waitUntilVotingThreadStops(5_000, indexes);

System.out.printf("%s\n", print());
assertOneLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testsLeaderCrash(Class<?> ignore) throws Exception {
assertIndices(7, 4);

RAFT leader=Stream.of(rafts()).filter(r -> r != null && r.isLeader()).findFirst().orElse(null);
System.out.printf("-- new leader: %s%n", leader);
assert leader != null;
System.out.printf("-- Leader: %s, commit-table:\n%s\n", leader.getAddress(), leader.commitTable());

Expand Down
Loading

0 comments on commit 0acfef5

Please sign in to comment.