Skip to content

Commit

Permalink
Fix PBFT for liveness
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 9, 2024
1 parent 0305057 commit ac0c100
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 128 deletions.
112 changes: 56 additions & 56 deletions src/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,77 +779,77 @@ impl<
}

#[cfg(test)]
use proptest::prelude::*;
mod tests {
use proptest::prelude::*;
use rand::thread_rng;

#[cfg(test)]
fn ordered_closest<const N: usize>(id: PeerId, insert_ids: [PeerId; N], targets: [Target; 100]) {
let origin = PeerRecord {
id,
key: (),
addr: (),
};
let mut buckets = Buckets::<_, _>::new(origin);
for insert_id in insert_ids {
if insert_id == id {
continue;
}
let record = PeerRecord {
id: insert_id,
key: (),
addr: (),
};
buckets.insert(record).unwrap()
}
for target in targets {
let records = buckets.find_closest(&target, 20.try_into().unwrap());
assert_eq!(records.len(), 20.min(N + 1)) // plus `origin`
}
}
use crate::{event::Void, net::IterAddr, worker::Worker};

#[cfg(test)]
proptest! {
#[test]
fn distance_inversion(id: PeerId, d: [u8; 32]) {
let d = U256::from_little_endian(&d);
assert_eq!(distance(&id, &distance_from(&id, d)), d)
}
use super::*;

#[test]
fn bucket_index(id: PeerId, target: Target) {
fn ordered_closest<const N: usize>(
id: PeerId,
insert_ids: [PeerId; N],
targets: [Target; 100],
) {
let origin = PeerRecord {
id,
key: (),
addr: (),
};
let buckets = Buckets::<_, _>::new(origin);
if target != id {
let d = distance(&id, &target);
let index = buckets.index(&target);
assert!(d.bit(index));
for i in index + 1..U256_BITS {
assert!(!d.bit(i))
let mut buckets = Buckets::<_, _>::new(origin);
for insert_id in insert_ids {
if insert_id == id {
continue;
}
let record = PeerRecord {
id: insert_id,
key: (),
addr: (),
};
buckets.insert(record).unwrap()
}
for target in targets {
let records = buckets.find_closest(&target, 20.try_into().unwrap());
assert_eq!(records.len(), 20.min(N + 1)) // plus `origin`
}
}

#[test]
fn ordered_closest_sufficient(id: PeerId, insert_ids: [PeerId; 1000], targets: [Target; 100]) {
ordered_closest(id, insert_ids, targets)
}

#[test]
fn ordered_closest_insufficient(id: PeerId, insert_ids: [PeerId; 10], targets: [Target; 100]) {
ordered_closest(id, insert_ids, targets)
}
}
proptest! {
#[test]
fn distance_inversion(id: PeerId, d: [u8; 32]) {
let d = U256::from_little_endian(&d);
assert_eq!(distance(&id, &distance_from(&id, d)), d)
}

#[cfg(test)]
mod tests {
use rand::thread_rng;
#[test]
fn bucket_index(id: PeerId, target: Target) {
let origin = PeerRecord {
id,
key: (),
addr: (),
};
let buckets = Buckets::<_, _>::new(origin);
if target != id {
let d = distance(&id, &target);
let index = buckets.index(&target);
assert!(d.bit(index));
for i in index + 1..U256_BITS {
assert!(!d.bit(i))
}
}
}

use crate::{event::Void, net::IterAddr, worker::Worker};
#[test]
fn ordered_closest_sufficient(id: PeerId, insert_ids: [PeerId; 1000], targets: [Target; 100]) {
ordered_closest(id, insert_ids, targets)
}

use super::*;
#[test]
fn ordered_closest_insufficient(id: PeerId, insert_ids: [PeerId; 10], targets: [Target; 100]) {
ordered_closest(id, insert_ids, targets)
}
}

struct NullNet;
impl SendMessage<IterAddr<'_, ()>, Verifiable<FindPeer<()>>> for NullNet {
Expand Down
115 changes: 84 additions & 31 deletions src/pbft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ pub struct Replica<N, CN, CW, S, A, M = (N, CN, CW, S, A)> {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive_where(Default)]
struct LogEntry<A> {
view_num: u32,
pre_prepare: Option<Verifiable<PrePrepare>>,
requests: Vec<Request<A>>,
prepares: Vec<(u8, Verifiable<Prepare>)>,
commits: Vec<(u8, Verifiable<Commit>)>,

timer_id: Option<TimerId>,
}

impl<N, CN, CW, S, A> Replica<N, CN, CW, S, A> {
Expand Down Expand Up @@ -386,33 +387,59 @@ impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Progress(u32); // op number

impl<M: ReplicaCommon> OnEvent<(Signed<PrePrepare>, Vec<Request<M::A>>)>
for Replica<M::N, M::CN, M::CW, M::S, M::A, M>
{
fn on_event(
&mut self,
(Signed(pre_prepare), requests): (Signed<PrePrepare>, Vec<Request<M::A>>),
_: &mut impl Timer<Self>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if pre_prepare.view_num != self.view_num {
return Ok(());
}
if self.log.get(pre_prepare.op_num as usize).is_none() {
self.log
.resize_with(pre_prepare.op_num as usize + 1, Default::default);

let op_num = pre_prepare.op_num as usize;
if self.log.get(op_num).is_none() {
self.log.resize_with(op_num + 1, Default::default)
}
let replaced = self.log[pre_prepare.op_num as usize]
.pre_prepare
.replace(pre_prepare.clone());
let replaced = self.log[op_num].pre_prepare.replace(pre_prepare.clone());
assert!(replaced.is_none());
self.log[pre_prepare.op_num as usize].view_num = self.view_num;
self.log[pre_prepare.op_num as usize]
.requests
.clone_from(&requests);

self.log[op_num].requests.clone_from(&requests);
let timer_id = timer.set(CLIENT_RESEND_INTERVAL / 5, Progress(op_num as _))?;
self.log[op_num].timer_id = Some(timer_id);

self.net.send(All, (pre_prepare, requests))
}
}

impl<M: ReplicaCommon> OnEvent<Progress> for Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn on_event(
&mut self,
Progress(op_num): Progress,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
let entry = &self.log[op_num as usize];
self.net.send(
All,
(
entry
.pre_prepare
.clone()
.ok_or(anyhow::anyhow!("missing PrePrepare"))?,
entry.requests.clone(),
),
)
}
}

impl<M: ReplicaCommon> OnEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<M::A>>)>>
for Replica<M::N, M::CN, M::CW, M::S, M::A, M>
{
Expand All @@ -427,14 +454,18 @@ impl<M: ReplicaCommon> OnEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<M::A>>)
}
return Ok(());
}
if let Some(entry) = self.log.get(pre_prepare.op_num as usize) {
if entry.pre_prepare.is_some() {
return Ok(());
}
}
// this was for eliminating duplicated verification on prepared slots, however this breaks
// liveness when primary resending PrePrepare
// the duplicated verification should only happen on slow path, which is acceptable
// if let Some(entry) = self.log.get(pre_prepare.op_num as usize) {
// if entry.pre_prepare.is_some() {
// return Ok(());
// }
// }

// a decent implementation probably should throttle here (as well as for prepares and
// commits) in order to mitigate faulty proposals
// omitted since it makes no difference in normal path
// omitted since (again) that only on slow path
let replica_id = pre_prepare.view_num as usize % self.num_replica;
self.crypto_worker.submit(Box::new(move |crypto, sender| {
if requests.sha256() == pre_prepare.digest
Expand Down Expand Up @@ -463,11 +494,13 @@ impl<M: ReplicaCommon> OnEvent<(Verified<PrePrepare>, Vec<Request<M::A>>)>
self.log
.resize_with(pre_prepare.op_num as usize + 1, Default::default);
}
if self.log[pre_prepare.op_num as usize].pre_prepare.is_some() {
return Ok(());
if let Some(prepared) = &self.log[pre_prepare.op_num as usize].pre_prepare {
if **prepared != *pre_prepare {
println!("! PrePrepare not identical to the prepared one");
return Ok(());
}
}
self.log[pre_prepare.op_num as usize].pre_prepare = Some(pre_prepare.clone());
self.log[pre_prepare.op_num as usize].view_num = self.view_num;
self.log[pre_prepare.op_num as usize].requests = requests;

let prepare = Prepare {
Expand All @@ -481,10 +514,14 @@ impl<M: ReplicaCommon> OnEvent<(Verified<PrePrepare>, Vec<Request<M::A>>)>
}))?;

if let Some(prepare_quorum) = self.prepare_quorums.get_mut(&pre_prepare.op_num) {
prepare_quorum.retain(|_, prepare| prepare.digest == pre_prepare.digest);
prepare_quorum.retain(|_, prepare| {
prepare.view_num == pre_prepare.view_num && prepare.digest == pre_prepare.digest
});
}
if let Some(commit_quorum) = self.commit_quorums.get_mut(&pre_prepare.op_num) {
commit_quorum.retain(|_, commit| commit.digest == pre_prepare.digest)
commit_quorum.retain(|_, commit| {
commit.view_num == pre_prepare.view_num && commit.digest == pre_prepare.digest
})
}
Ok(())
}
Expand Down Expand Up @@ -627,14 +664,14 @@ impl<M: ReplicaCommon> OnEvent<Signed<Commit>> for Replica<M::N, M::CN, M::CW, M
fn on_event(
&mut self,
Signed(commit): Signed<Commit>,
_: &mut impl Timer<Self>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if commit.view_num != self.view_num {
return Ok(());
}
self.net.send(All, commit.clone())?;
if self.log[commit.op_num as usize].commits.is_empty() {
self.insert_commit(commit)?
self.insert_commit(commit, timer)?
}
Ok(())
}
Expand Down Expand Up @@ -694,13 +731,13 @@ impl<M: ReplicaCommon> OnEvent<Verified<Commit>> for Replica<M::N, M::CN, M::CW,
fn on_event(
&mut self,
Verified(commit): Verified<Commit>,
_: &mut impl Timer<Self>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if commit.view_num != self.view_num {
return Ok(());
}
let op_num = commit.op_num;
self.insert_commit(commit)?;
self.insert_commit(commit, timer)?;
loop {
let Some(pending_commits) = self.pending_commits.get_mut(&op_num) else {
break;
Expand All @@ -719,11 +756,16 @@ impl<M: ReplicaCommon> OnEvent<Verified<Commit>> for Replica<M::N, M::CN, M::CW,
}

impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
fn insert_commit(&mut self, commit: Verifiable<Commit>) -> anyhow::Result<()> {
fn insert_commit(
&mut self,
commit: Verifiable<Commit>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
let commit_quorum = self.commit_quorums.entry(commit.op_num).or_default();
commit_quorum.insert(commit.replica_id, commit.clone());
// println!(
// "{} PrePrepare {} Commit {}",
// "[{}] {} PrePrepare {} Commit {}",
// self.id,
// commit.op_num,
// self.log.get(commit.op_num as usize).is_some(),
// commit_quorum.len()
Expand All @@ -745,12 +787,23 @@ impl<M: ReplicaCommon> Replica<M::N, M::CN, M::CW, M::S, M::A, M> {
.into_iter()
.collect();

while let Some(entry) = self.log.get(self.commit_num as usize + 1) {
let is_primary = self.is_primary();
while let Some(entry) = self.log.get_mut(self.commit_num as usize + 1) {
if entry.commits.is_empty() {
break;
}
self.commit_num += 1;
// println!("Commit {}", self.commit_num);
// println!("[{}] Commit {}", self.id, self.commit_num);

if is_primary {
timer.unset(
entry
.timer_id
.take()
.ok_or(anyhow::anyhow!("missing progress timer"))?,
)?
}

for request in &entry.requests {
let result = Payload(self.app.execute(&request.op)?);
let seq = request.seq;
Expand Down
Loading

0 comments on commit ac0c100

Please sign in to comment.