diff --git a/openraft/src/core/leader_state.rs b/openraft/src/core/leader_state.rs index b62ff0a6f..84fc0a904 100644 --- a/openraft/src/core/leader_state.rs +++ b/openraft/src/core/leader_state.rs @@ -10,7 +10,6 @@ use crate::core::ReplicationState; use crate::core::ServerState; use crate::engine::Command; use crate::entry::EntryRef; -use crate::entry::RaftEntry; use crate::error::ClientWriteError; use crate::error::ExtractFatal; use crate::error::Fatal; @@ -18,6 +17,7 @@ use crate::metrics::ReplicationMetrics; use crate::raft::ClientWriteResponse; use crate::raft::RaftMsg; use crate::raft::RaftRespTx; +use crate::raft_types::RaftLogId; use crate::replication::ReplicaEvent; use crate::runtime::RaftRuntime; use crate::summary::MessageSummary; diff --git a/openraft/src/engine/engine.rs b/openraft/src/engine/engine.rs index ac4c4032a..f21f33dcd 100644 --- a/openraft/src/engine/engine.rs +++ b/openraft/src/engine/engine.rs @@ -134,6 +134,7 @@ impl Engine { self.check_initialize()?; self.assign_log_ids(entries.iter_mut()); + self.state.extend_log_ids_from_same_leader(entries); self.commands.push(Command::AppendInputEntries { range: 0..l }); self.metrics_flags.set_data_changed(); @@ -168,6 +169,7 @@ impl Engine { } self.assign_log_ids(entries.iter_mut()); + self.state.extend_log_ids_from_same_leader(entries); self.commands.push(Command::AppendInputEntries { range: 0..l }); self.metrics_flags.set_data_changed(); diff --git a/openraft/src/engine/initialize_test.rs b/openraft/src/engine/initialize_test.rs index f039c2051..a7906472e 100644 --- a/openraft/src/engine/initialize_test.rs +++ b/openraft/src/engine/initialize_test.rs @@ -45,7 +45,11 @@ fn test_initialize() -> anyhow::Result<()> { eng.id = 1; eng.initialize(&mut entries)?; + + assert_eq!(Some(log_id0), eng.state.get_log_id(0)); + assert_eq!(None, eng.state.get_log_id(1)); assert_eq!(Some(log_id0), eng.state.last_log_id); + assert_eq!(ServerState::Candidate, eng.state.server_state); assert_eq!( MetricsChangeFlags { @@ -55,6 +59,7 @@ fn test_initialize() -> anyhow::Result<()> { eng.metrics_flags ); assert_eq!(m12(), eng.state.effective_membership.membership); + assert_eq!( vec![ Command::AppendInputEntries { range: 0..1 }, diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs new file mode 100644 index 000000000..c4961edc0 --- /dev/null +++ b/openraft/src/engine/log_id_list.rs @@ -0,0 +1,105 @@ +use crate::raft_types::RaftLogId; +use crate::LogId; +use crate::NodeId; + +/// Efficient storage for log ids. +/// +/// It stores only the ids of log that have a new leader_id. And the `last_log_id` at the end. +/// I.e., the oldest log id belonging to every leader. +/// +/// If it is not empty, the first one is `last_purged_log_id` and the last one is `last_log_id`. +/// The last one may have the same leader id as the second last one. +#[derive(Default, Debug, Clone)] +#[derive(PartialEq, Eq)] +pub struct LogIdList { + key_log_ids: Vec>, +} + +impl LogIdList { + pub fn new(key_log_ids: impl IntoIterator>) -> Self { + Self { + key_log_ids: key_log_ids.into_iter().collect(), + } + } + + /// Extends a list of `log_id` that are proposed by a same leader. + /// + /// The log ids in the input has to be continuous. + pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { + if let Some(first) = new_ids.first() { + let first_id = first.get_log_id(); + self.append(*first_id); + + if let Some(last) = new_ids.last() { + let last_id = last.get_log_id(); + assert_eq!(last_id.leader_id, first_id.leader_id); + + if last_id != first_id { + self.append(*last_id); + } + } + } + } + + /// Append a new `log_id`. + /// + /// The log id to append does not have to be the next to the last one in `key_log_ids`. + /// In such case, it is meant to append a list of log ids. + /// + /// NOTE: The last two in `key_log_ids` may be with the same `leader_id`, because `last_log_id` always present in + /// `log_ids`. + pub(crate) fn append(&mut self, new_log_id: LogId) { + let l = self.key_log_ids.len(); + if l == 0 { + self.key_log_ids.push(new_log_id); + return; + } + + // l >= 1 + + assert!(new_log_id > self.key_log_ids[l - 1]); + + if l == 1 { + self.key_log_ids.push(new_log_id); + return; + } + + // l >= 2 + + let last = self.key_log_ids[l - 1]; + + if self.key_log_ids.get(l - 2).map(|x| x.leader_id) == Some(last.leader_id) { + // Replace the **last log id**. + self.key_log_ids[l - 1] = new_log_id; + return; + } + + // The last one is an initial log entry of a leader. + // Add a **last log id** with the same leader id. + + self.key_log_ids.push(new_log_id); + } + + /// Get the log id at the specified index. + /// + /// It will return `last_purged_log_id` if index is at the last purged index. + #[allow(dead_code)] + pub(crate) fn get(&self, index: u64) -> Option> { + let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index)); + + match res { + Ok(i) => Some(LogId::new(self.key_log_ids[i].leader_id, index)), + Err(i) => { + if i == 0 || i == self.key_log_ids.len() { + None + } else { + Some(LogId::new(self.key_log_ids[i - 1].leader_id, index)) + } + } + } + } + + pub(crate) fn key_log_ids(&self) -> &[LogId] { + &self.key_log_ids + } +} diff --git a/openraft/src/engine/log_id_list_test.rs b/openraft/src/engine/log_id_list_test.rs new file mode 100644 index 000000000..a4b707df6 --- /dev/null +++ b/openraft/src/engine/log_id_list_test.rs @@ -0,0 +1,110 @@ +use crate::engine::LogIdList; +use crate::LeaderId; +use crate::LogId; + +#[test] +fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> { + let log_id = |t, i| LogId:: { + leader_id: LeaderId { term: t, node_id: 1 }, + index: i, + }; + + let mut ids = LogIdList::::default(); + + ids.extend_from_same_leader(&[log_id(1, 2)]); + assert_eq!(vec![log_id(1, 2)], ids.key_log_ids()); + + ids.extend_from_same_leader(&[ + log_id(1, 3), // + log_id(1, 4), + ]); + assert_eq!( + vec![ + log_id(1, 2), // + log_id(1, 4) + ], + ids.key_log_ids(), + "same leader as the last" + ); + + ids.extend_from_same_leader(&[ + log_id(2, 5), // + log_id(2, 6), + log_id(2, 7), + ]); + assert_eq!( + vec![ + log_id(1, 2), // + log_id(2, 5), + log_id(2, 7) + ], + ids.key_log_ids(), + "different leader as the last" + ); + + Ok(()) +} + +#[test] +fn test_log_id_list_append() -> anyhow::Result<()> { + let log_id = |t, i| LogId:: { + leader_id: LeaderId { term: t, node_id: 1 }, + index: i, + }; + + let mut ids = LogIdList::::default(); + + let cases = vec![ + (log_id(1, 2), vec![log_id(1, 2)]), // + (log_id(1, 3), vec![log_id(1, 2), log_id(1, 3)]), + (log_id(1, 4), vec![log_id(1, 2), log_id(1, 4)]), + (log_id(2, 5), vec![log_id(1, 2), log_id(2, 5)]), + (log_id(2, 7), vec![log_id(1, 2), log_id(2, 5), log_id(2, 7)]), + (log_id(2, 9), vec![log_id(1, 2), log_id(2, 5), log_id(2, 9)]), + ]; + + for (new_log_id, want) in cases { + ids.append(new_log_id); + assert_eq!(want, ids.key_log_ids()); + } + + Ok(()) +} + +#[test] +fn test_log_id_list_get_log_id() -> anyhow::Result<()> { + let log_id = |t, i| LogId:: { + leader_id: LeaderId { term: t, node_id: 1 }, + index: i, + }; + + let ids = LogIdList::::default(); + + assert!(ids.get(0).is_none()); + assert!(ids.get(1).is_none()); + assert!(ids.get(2).is_none()); + + let ids = LogIdList::::new(vec![ + log_id(1, 1), + log_id(1, 2), + log_id(3, 3), + log_id(5, 6), + log_id(7, 8), + log_id(7, 10), + ]); + + assert_eq!(None, ids.get(0)); + assert_eq!(Some(log_id(1, 1)), ids.get(1)); + assert_eq!(Some(log_id(1, 2)), ids.get(2)); + assert_eq!(Some(log_id(3, 3)), ids.get(3)); + assert_eq!(Some(log_id(3, 4)), ids.get(4)); + assert_eq!(Some(log_id(3, 5)), ids.get(5)); + assert_eq!(Some(log_id(5, 6)), ids.get(6)); + assert_eq!(Some(log_id(5, 7)), ids.get(7)); + assert_eq!(Some(log_id(7, 8)), ids.get(8)); + assert_eq!(Some(log_id(7, 9)), ids.get(9)); + assert_eq!(Some(log_id(7, 10)), ids.get(10)); + assert_eq!(None, ids.get(11)); + + Ok(()) +} diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index c390ed46a..4cd4f7d91 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -3,6 +3,10 @@ mod engine; #[cfg(test)] mod initialize_test; +mod log_id_list; +#[cfg(test)] +mod log_id_list_test; pub(crate) use engine::Command; pub(crate) use engine::Engine; +pub use log_id_list::LogIdList; diff --git a/openraft/src/entry.rs b/openraft/src/entry.rs index 2c7adb618..0feb609fd 100644 --- a/openraft/src/entry.rs +++ b/openraft/src/entry.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use crate::raft_types::RaftLogId; use crate::LogId; use crate::Membership; use crate::MessageSummary; @@ -16,11 +17,7 @@ pub trait RaftPayload { } /// Defines operations on an entry. -pub trait RaftEntry: RaftPayload { - fn get_log_id(&self) -> &LogId; - - fn set_log_id(&mut self, log_id: &LogId); -} +pub trait RaftEntry: RaftPayload + RaftLogId {} /// Log entry payload variants. #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -185,7 +182,7 @@ impl RaftPayload for Entry { } } -impl RaftEntry for Entry { +impl RaftLogId for Entry { fn get_log_id(&self) -> &LogId { &self.log_id } @@ -195,6 +192,8 @@ impl RaftEntry for Entry { } } +impl RaftEntry for Entry {} + // impl traits for RefEntry impl<'p, C: RaftTypeConfig> RaftPayload for EntryRef<'p, C> { @@ -207,7 +206,7 @@ impl<'p, C: RaftTypeConfig> RaftPayload for EntryRef<'p, C> { } } -impl<'p, C: RaftTypeConfig> RaftEntry for EntryRef<'p, C> { +impl<'p, C: RaftTypeConfig> RaftLogId for EntryRef<'p, C> { fn get_log_id(&self) -> &LogId { &self.log_id } @@ -216,3 +215,5 @@ impl<'p, C: RaftTypeConfig> RaftEntry for EntryRef<'p, C> { self.log_id = *log_id; } } + +impl<'p, C: RaftTypeConfig> RaftEntry for EntryRef<'p, C> {} diff --git a/openraft/src/raft_state.rs b/openraft/src/raft_state.rs index d7dfd39b3..31976076d 100644 --- a/openraft/src/raft_state.rs +++ b/openraft/src/raft_state.rs @@ -1,9 +1,14 @@ use std::sync::Arc; -use crate::core::ServerState; +use crate::engine::LogIdList; use crate::membership::EffectiveMembership; +use crate::raft_types::RaftLogId; use crate::LogId; use crate::NodeId; +use crate::RaftStorage; +use crate::RaftTypeConfig; +use crate::ServerState; +use crate::StorageError; use crate::Vote; /// A struct used to represent the raft state which a Raft node needs. @@ -25,10 +30,15 @@ pub struct RaftState { /// The LogId of the last log applied to the state machine. pub last_applied: Option>, + /// All log ids this node has. + pub log_ids: LogIdList, + /// The latest cluster membership configuration found, in log or in state machine. pub effective_membership: Arc>, + // -- // -- volatile fields: they are not persisted. + // -- /// The log id of the last known committed entry. /// /// - Committed means: a log that is replicated to a quorum of the cluster and it is of the term of the leader. @@ -40,3 +50,115 @@ pub struct RaftState { pub server_state: ServerState, } + +impl RaftState { + /// Load all log ids that are the first one proposed by a leader. + /// + /// E.g., log ids with the same leader id will be got rid of, except the smallest. + /// The `last_log_id` will always present at the end, to simplify searching. + /// + /// Given an example with the logs `[(2,2),(2,3),(5,4),(5,5)]`, and the `last_purged_log_id` is (1,1). + /// This function returns `[(1,1),(2,2),(5,4),(5,5)]`. + /// + /// It adopts a modified binary-search algo. + /// ```text + /// input: + /// A---------------C + /// + /// load the mid log-id, then compare the first, the middle, and the last: + /// + /// A---------------A : push_res(A); + /// A-------A-------C : push_res(A); find(A,C) // both find `A`, need to de-dup + /// A-------B-------C : find(A,B); find(B,C) // both find `B`, need to de-dup + /// A-------C-------C : find(A,C) + /// ``` + pub async fn load_log_ids( + last_purged_log_id: Option>, + last_log_id: Option>, + sto: &mut Sto, + ) -> Result, StorageError> + where + C: RaftTypeConfig, + Sto: RaftStorage + ?Sized, + { + let mut res = vec![]; + + let last = match last_log_id { + None => return Ok(LogIdList::new(res)), + Some(x) => x, + }; + let first = match last_purged_log_id { + None => sto.get_log_id(0).await?, + Some(x) => x, + }; + + // Recursion stack + let mut stack = vec![(first, last)]; + + loop { + let (first, last) = match stack.pop() { + None => { + break; + } + Some(x) => x, + }; + + // Case AA + if first.leader_id == last.leader_id { + if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + res.push(first); + } + continue; + } + + // Two adjacent logs with different leader_id, no need to binary search + if first.index + 1 == last.index { + if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + res.push(first); + } + res.push(last); + continue; + } + + let mid = sto.get_log_id((first.index + last.index) / 2).await?; + + if first.leader_id == mid.leader_id { + // Case AAC + if res.last().map(|x| x.leader_id) < Some(first.leader_id) { + res.push(first); + } + stack.push((mid, last)); + } else if mid.leader_id == last.leader_id { + // Case ACC + stack.push((first, mid)); + } else { + // Case ABC + // first.leader_id < mid_log_id.leader_id < last.leader_id + // Deal with (first, mid) then (mid, last) + stack.push((mid, last)); + stack.push((first, mid)); + } + } + + if res.last() != Some(&last) { + res.push(last); + } + + Ok(LogIdList::new(res)) + } + + /// Append a list of `log_id`. + /// + /// The log ids in the input has to be continuous. + pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { + self.log_ids.extend_from_same_leader(new_log_id) + } + + /// Get the log id at the specified index. + /// + /// It will return `last_purged_log_id` if index is at the last purged index. + #[allow(dead_code)] + pub(crate) fn get_log_id(&self, index: u64) -> Option> { + self.log_ids.get(index) + } +} diff --git a/openraft/src/raft_types.rs b/openraft/src/raft_types.rs index d6fa5cc08..894fb38ff 100644 --- a/openraft/src/raft_types.rs +++ b/openraft/src/raft_types.rs @@ -18,6 +18,22 @@ pub struct LogId { pub index: u64, } +pub trait RaftLogId { + fn get_log_id(&self) -> &LogId; + + fn set_log_id(&mut self, log_id: &LogId); +} + +impl RaftLogId for LogId { + fn get_log_id(&self) -> &LogId { + self + } + + fn set_log_id(&mut self, log_id: &LogId) { + *self = *log_id + } +} + impl Display for LogId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}-{}", self.leader_id, self.index) diff --git a/openraft/src/storage.rs b/openraft/src/storage.rs index a80c936ae..07ba1442b 100644 --- a/openraft/src/storage.rs +++ b/openraft/src/storage.rs @@ -242,6 +242,8 @@ where C: RaftTypeConfig last_purged_log_id = last_applied; } + let log_ids = RaftState::load_log_ids(last_purged_log_id, last_log_id, self).await?; + Ok(RaftState { last_log_id, last_purged_log_id, @@ -249,6 +251,7 @@ where C: RaftTypeConfig // The initial value for `vote` is the minimal possible value. // See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization) vote: vote.unwrap_or_default(), + log_ids, effective_membership: Arc::new(membership), // -- volatile fields: they are not persisted. diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index ac5d97f49..3d0df7af1 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -87,6 +87,7 @@ where run_fut(Suite::get_initial_state_with_state(builder))?; run_fut(Suite::get_initial_state_last_log_gt_sm(builder))?; run_fut(Suite::get_initial_state_last_log_lt_sm(builder))?; + run_fut(Suite::get_initial_state_log_ids(builder))?; run_fut(Suite::save_vote(builder))?; run_fut(Suite::get_log_entries(builder))?; run_fut(Suite::try_get_log_entry(builder))?; @@ -276,7 +277,7 @@ where Self::default_vote(&mut store).await?; store - .append_to_log(&[&Entry { + .append_to_log(&[&blank(0, 0), &blank(1, 1), &Entry { log_id: LogId::new(LeaderId::new(3, NODE_ID.into()), 2), payload: EntryPayload::Blank, }]) @@ -386,7 +387,7 @@ where Self::default_vote(&mut store).await?; store - .append_to_log(&[&Entry { + .append_to_log(&[&blank(0, 0), &Entry { log_id: LogId::new(LeaderId::new(2, NODE_ID.into()), 1), payload: EntryPayload::Blank, }]) @@ -438,6 +439,124 @@ where Ok(()) } + pub async fn get_initial_state_log_ids(builder: &B) -> Result<(), StorageError> { + let mut store = builder.build().await; + + let log_id = |t, n: u64, i| LogId:: { + leader_id: LeaderId { + term: t, + node_id: n.into(), + }, + index: i, + }; + + tracing::info!("--- empty store, expect []"); + { + let initial = store.get_initial_state().await?; + assert_eq!(Vec::>::new(), initial.log_ids.key_log_ids()); + } + + tracing::info!("--- log terms: [0], last_purged_log_id is None, expect [(0,0)]"); + { + store.append_to_log(&[&blank(0, 0)]).await?; + + let initial = store.get_initial_state().await?; + assert_eq!(vec![log_id(0, 0, 0)], initial.log_ids.key_log_ids()); + } + + tracing::info!("--- log terms: [0,1,1,2], last_purged_log_id is None, expect [(0,0),(1,1),(2,3)]"); + { + store.append_to_log(&[&blank(1, 1), &blank(1, 2), &blank(2, 3)]).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![log_id(0, 0, 0), log_id(1, 0, 1), log_id(2, 0, 3)], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!( + "--- log terms: [0,1,1,2,2,3,3], last_purged_log_id is None, expect [(0,0),(1,1),(2,3),(3,5),(3,6)]" + ); + { + store.append_to_log(&[&blank(2, 4), &blank(3, 5), &blank(3, 6)]).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![ + log_id(0, 0, 0), + log_id(1, 0, 1), + log_id(2, 0, 3), + log_id(3, 0, 5), + log_id(3, 0, 6) + ], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!( + "--- log terms: [x,1,1,2,2,3,3], last_purged_log_id: (0,0), expect [(0,0),(1,1),(2,3),(3,5),(3,6)]" + ); + { + store.purge_logs_upto(log_id(0, 0, 0)).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![ + log_id(0, 0, 0), + log_id(1, 0, 1), + log_id(2, 0, 3), + log_id(3, 0, 5), + log_id(3, 0, 6) + ], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!("--- log terms: [x,x,1,2,2,3,3], last_purged_log_id: (1,1), expect [(1,1),(2,3),(3,5),(3,6)]"); + { + store.purge_logs_upto(log_id(1, 0, 1)).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![log_id(1, 0, 1), log_id(2, 0, 3), log_id(3, 0, 5), log_id(3, 0, 6)], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!("--- log terms: [x,x,x,2,2,3,3], last_purged_log_id: (1,2), expect [(1,2),(2,3),(3,5),(3,6)]"); + { + store.purge_logs_upto(log_id(1, 0, 2)).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![log_id(1, 0, 2), log_id(2, 0, 3), log_id(3, 0, 5), log_id(3, 0, 6)], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!("--- log terms: [x,x,x,x,2,3,3], last_purged_log_id: (2,3), expect [(2,3),(3,5),(3,6)]"); + { + store.purge_logs_upto(log_id(2, 0, 3)).await?; + + let initial = store.get_initial_state().await?; + assert_eq!( + vec![log_id(2, 0, 3), log_id(3, 0, 5), log_id(3, 0, 6)], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!("--- log terms: [x,x,x,x,x,x,x], last_purged_log_id: (3,6), e.g., all purged expect [(3,6)]"); + { + store.purge_logs_upto(log_id(3, 0, 6)).await?; + + let initial = store.get_initial_state().await?; + assert_eq!(vec![log_id(3, 0, 6)], initial.log_ids.key_log_ids()); + } + + Ok(()) + } + pub async fn save_vote(builder: &B) -> Result<(), StorageError> { let mut store = builder.build().await; diff --git a/rustfmt.toml b/rustfmt.toml index 7a13826dd..9ad73caae 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -35,6 +35,8 @@ wrap_comments = true comment_width = 120 max_width = 120 +merge_derives = false + # pre-unstable chain_width = 100