Skip to content

Commit

Permalink
refactor: simplify replication remove after membership change
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Dec 25, 2021
1 parent f413fd2 commit 2320b8a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 81 deletions.
78 changes: 38 additions & 40 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponse;
use crate::raft::MembershipConfig;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftError;
use crate::RaftNetwork;
Expand Down Expand Up @@ -260,8 +258,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "debug", skip(self))]
pub(super) fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) {
let index = log_id.index;
// TODO(xp): 111 when membership config log is committed, there is nothing has to do.
// TODO(xp): removed follower should be able to receive the message that commits a joint log.

// Step down if needed.
if !self.core.membership.membership.contains(&self.core.id) {
Expand All @@ -273,55 +269,57 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
return;
}

// Remove any replication streams which have replicated this config & which are no longer
// cluster members. All other replication streams which are no longer cluster members, but
// which have not yet replicated this config will be marked for removal.
let membership = &self.core.membership.membership;

let all = membership.all_nodes();
for (id, state) in self.nodes.iter_mut() {
if all.contains(id) {
continue;
}
state.remove_after_commit = Some(index)
}

let nodes_to_remove: Vec<_> = self
.nodes
.iter_mut()
.filter(|(id, _)| !membership.contains(id))
.filter_map(|(idx, repl_state)| {
if repl_state.matched.index >= index {
Some(*idx)
} else {
repl_state.remove_after_commit = Some(index);
None
}
})
.collect();
tracing::info!(
"set remove_after_commit for {} = {}, membership: {:?}",
id,
index,
self.core.membership
);

let follower_ids: Vec<u64> = self.nodes.keys().cloned().collect();
tracing::debug!("nodes: {:?}", follower_ids);
tracing::debug!("membership: {:?}", self.core.membership);
tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove);
state.remove_since = Some(index)
}

for target in nodes_to_remove {
tracing::debug!(target, "removing target node from replication pool");
// TODO(xp): just drop the replication then the task will be terminated.
let removed = self.nodes.remove(&target);
assert!(removed.is_some());
let targets = self.nodes.keys().cloned().collect::<Vec<_>>();
for target in targets {
self.try_remove_replication(target);
}

tracing::info!(
"handle_uniform_consensus_committed: removed replication node: {} {:?}",
target,
removed.as_ref().map(|x| (*x).summary())
);
self.leader_report_metrics();
}

if let Some(node) = removed {
let _ = node.repl_stream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
self.leader_metrics.replication.remove(&target);
/// Remove a replication if the membership that does not include it has committed.
///
/// Return true if removed.
#[tracing::instrument(level = "debug", skip(self))]
pub fn try_remove_replication(&mut self, target: u64) -> bool {
{
let n = self.nodes.get(&target);

if let Some(n) = n {
if let Some(since) = n.remove_since {
if n.matched.index < since {
return false;
}
} else {
return false;
}
} else {
tracing::warn!("trying to remove absent replication to {}", target);
return false;
}
}
self.leader_report_metrics();

tracing::info!("removed replication to: {}", target);
self.nodes.remove(&target);
self.leader_metrics.replication.remove(&target);
true
}
}
4 changes: 2 additions & 2 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// A struct tracking the state of a replication stream from the perspective of the Raft actor.
struct ReplicationState<D: AppData> {
pub matched: LogId,
pub remove_after_commit: Option<u64>,
pub remove_since: Option<u64>,
pub repl_stream: ReplicationStream<D>,

/// The response channel to use for when this node has successfully synced with the cluster.
Expand All @@ -802,7 +802,7 @@ impl<D: AppData> MessageSummary for ReplicationState<D> {
fn summary(&self) -> String {
format!(
"matched: {}, remove_after_commit: {:?}",
self.matched, self.remove_after_commit
self.matched, self.remove_since
)
}
}
Expand Down
33 changes: 4 additions & 29 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
ReplicationState {
matched: LogId { term: 0, index: 0 },
repl_stream: replstream,
remove_after_commit: None,
remove_since: None,
tx: caller_tx,
}
}
Expand Down Expand Up @@ -87,7 +87,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "debug", skip(self))]
async fn handle_update_matched(&mut self, target: NodeId, matched: LogId) -> RaftResult<()> {
// Update target's match index & check if it is awaiting removal.
let mut needs_removal = false;

if let Some(state) = self.nodes.get_mut(&target) {
tracing::debug!("state.matched: {}, update to matched: {}", state.matched, matched);
Expand All @@ -107,39 +106,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let _ = tx.send(Ok(x));
}
}

// TODO(xp): stop replication only when commit index is replicated?
if let Some(threshold) = &state.remove_after_commit {
if &matched.index >= threshold {
needs_removal = true;
}
}
} else {
// TODO(xp): this should be get rid of.
// handle_update_mactched_and_rate() and send_append_entries() runs async-ly.
// There is chance another update-matched event is sent just before the replication node is
// removed.
// It is not a bug.
// panic!("replication is removed: {}", target);

return Ok(());
}

// TODO(xp): use Vec<_> to replace the two membership configs.
// Drop replication stream if needed.
if needs_removal {
let removed = self.nodes.remove(&target);
tracing::info!(
"handle_update_matched_and_rate: removed replication node: {} {:?}",
target,
removed.as_ref().map(|x| x.summary())
);

if let Some(node) = removed {
// TODO(xp): do not need to send, just close.
let _ = node.repl_stream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
self.leader_metrics.replication.remove(&target);
}
// Drop replication stream if needed.
if self.try_remove_replication(target) {
// nothing to do
} else {
self.update_leader_metrics(target, matched);
}
Expand Down
17 changes: 7 additions & 10 deletions async-raft/tests/members_stop_repl_to_removed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use async_raft::State;
use fixtures::RaftRouter;
use futures::stream::StreamExt;
use maplit::btreeset;
use tracing_futures::Instrument;

#[macro_use]
mod fixtures;

Expand All @@ -30,8 +26,8 @@ async fn members_stop_repl_to_removed() -> Result<()> {
router.new_raft_node(3).await;
router.new_raft_node(4).await;

router.add_non_voter(0, 3).await;
router.add_non_voter(0, 4).await;
router.add_non_voter(0, 3).await?;
router.add_non_voter(0, 4).await?;

tracing::info!("--- changing config to 2,3,4");
{
Expand All @@ -42,15 +38,16 @@ async fn members_stop_repl_to_removed() -> Result<()> {
router
.wait(&i, timeout())
.await?
.metrics(|x| x.last_applied >= n_logs, "all nodes recv change-membership logs")
.metrics(|x| x.last_log_index >= n_logs, "all nodes recv change-membership logs")
.await?;
}
}

tracing::info!("--- write to new cluster");
tracing::info!("--- write to new cluster, cuurent log={}", n_logs);
{
router.client_request_many(0, "after_change", 5).await;
n_logs += 5;
let n = 10;
router.client_request_many(0, "after_change", n).await;
n_logs += n as u64;

for i in &[0, 3, 4] {
router
Expand Down

0 comments on commit 2320b8a

Please sign in to comment.