Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: when responding ForwardToLeader, make leader_id a None if the leader is no longer in the cluster #594

Merged
merged 1 commit into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,11 +863,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(crate) fn reject_with_forward_to_leader<T, E>(&self, tx: RaftRespTx<T, E>)
where E: From<ForwardToLeader<C::NodeId, C::Node>> {
let l = self.current_leader();
let err = ForwardToLeader {
leader_id: l,
leader_node: self.get_leader_node(l),
};
let mut leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id);

// Leader is no longer a node in the membership config.
if leader_node.is_none() {
leader_id = None;
}

let err = ForwardToLeader { leader_id, leader_node };

let _ = tx.send(Err(err.into()));
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where
Ok(count as u64)
}

async fn send_client_request(
pub async fn send_client_request(
&self,
target: C::NodeId,
req: C::D,
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod t16_change_membership_cases;
mod t20_change_membership;
mod t25_elect_with_new_config;
mod t30_commit_joint_config;
mod t30_step_down;
mod t30_remove_leader;
mod t40_removed_follower;
mod t45_remove_unreachable_follower;
mod t99_issue_471_adding_learner_uses_uninit_leader_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use memstore::ClientRequest;
use memstore::IntoMemClientRequest;
use openraft::error::ClientWriteError;
use openraft::Config;
use openraft::LeaderId;
use openraft::LogId;
Expand All @@ -16,7 +19,7 @@ use crate::fixtures::RaftRouter;
/// - Then the leader should step down after joint log is committed.
/// - Check logs on other node.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn step_down() -> Result<()> {
async fn remove_leader() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
Expand Down Expand Up @@ -105,6 +108,79 @@ async fn step_down() -> Result<()> {
Ok(())
}

/// Change membership from {0,1,2} to {2}. Access {2} at once.
///
/// It should not respond a ForwardToLeader error that pointing to the removed leader.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn remove_leader_access_new_cluster() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {}).await?;

let orig_leader = 0;

tracing::info!("--- change membership 012 to 2");
{
let node = router.get_raft_handle(&orig_leader)?;
node.change_membership(btreeset![2], true, false).await?;
// 2 change_membership logs
log_index += 2;

router
.wait(&2, timeout())
.log(Some(log_index), "new leader node-2 commits 2 membership log")
.await?;
}

tracing::info!("--- old leader commits 2 membership log");
{
router
.wait(&orig_leader, timeout())
.log(Some(log_index), "old leader commits 2 membership log")
.await?;
}

let res = router.send_client_request(2, ClientRequest::make_request("foo", 1)).await;
match res {
Ok(_) => {
unreachable!("expect error");
}
Err(cli_err) => match cli_err {
ClientWriteError::ForwardToLeader(fwd) => {
assert!(fwd.leader_id.is_none());
assert!(fwd.leader_node.is_none());
}
_ => {
unreachable!("expect ForwardToLeader");
}
},
}

tracing::info!("--- elect node-2, handle write");
{
let n2 = router.get_raft_handle(&2)?;
n2.enable_elect(true);
n2.wait(timeout()).state(ServerState::Leader, "node-2 elect itself").await?;
log_index += 1;

router.send_client_request(2, ClientRequest::make_request("foo", 1)).await?;
log_index += 1;

n2.wait(timeout()).log(Some(log_index), "node-2 become leader and handle write request").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(3_000))
}