Skip to content

Commit

Permalink
Revise PBFT
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Jun 26, 2024
1 parent 767b52b commit 761fa27
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
build_and_test:
name: Build and test
runs-on: ubuntu-latest
continue-on-error: ${{ matrix.toolchain != 'stable' }}
strategy:
matrix:
toolchain: [stable, beta]
Expand Down
87 changes: 27 additions & 60 deletions src/pbft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,7 @@ impl<M: ReplicaCommon> OnEvent<ProgressPrepared> for Replica<M::PN, M::DN, M::CW
&mut self,
ProgressPrepared(op_num): ProgressPrepared,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
warn!("progress prepared {op_num}");
let entry = &self.log[op_num as usize];
let pre_prepare = entry
Expand Down Expand Up @@ -1032,10 +1029,8 @@ impl<M: ReplicaCommon> OnEvent<StateTransfer> for Replica<M::PN, M::DN, M::CW, M
&mut self,
StateTransfer(op_num): StateTransfer,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
// TODO
anyhow::bail!("{:?} timeout", StateTransfer(op_num))
}
}
Expand All @@ -1045,10 +1040,7 @@ impl<M: ReplicaCommon> OnEvent<Recv<QueryNewView>> for Replica<M::PN, M::DN, M::
&mut self,
Recv(query_new_view): Recv<QueryNewView>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
if let Some(new_view) = self.new_views.get(&query_new_view.view_num) {
self.net.send(query_new_view.replica_id, new_view.clone())?
}
Expand All @@ -1064,10 +1056,7 @@ impl<M: ReplicaCommon> OnEvent<DoViewChange> for Replica<M::PN, M::DN, M::CW, M:
&mut self,
DoViewChange(view_num): DoViewChange,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
warn!("[{}] do view change for view {view_num}", self.id);
assert!(view_num >= self.view_num);
self.view_num = view_num;
Expand All @@ -1085,10 +1074,7 @@ impl<M: ReplicaCommon> OnEvent<ProgressViewChange> for Replica<M::PN, M::DN, M::
&mut self,
ProgressViewChange: ProgressViewChange,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
self.do_view_change()
}
}
Expand Down Expand Up @@ -1122,10 +1108,7 @@ impl<M: ReplicaCommon> OnEvent<Signed<ViewChange>> for Replica<M::PN, M::DN, M::
&mut self,
Signed(view_change): Signed<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
if view_change.view_num == self.view_num {
self.net.send(All, view_change.clone())?;
self.insert_view_change(view_change, timer)?
Expand Down Expand Up @@ -1159,10 +1142,7 @@ impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<ViewChange>>>
&mut self,
Recv(view_change): Recv<Verifiable<ViewChange>>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
if view_change.view_num < self.view_num {
return Ok(());
}
Expand All @@ -1185,10 +1165,7 @@ impl<M: ReplicaCommon> OnEvent<Verified<ViewChange>>
&mut self,
Verified(view_change): Verified<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
self.insert_view_change(view_change, timer)
}
}
Expand Down Expand Up @@ -1236,15 +1213,19 @@ fn pre_prepares_for_view_changes(
Ok(pre_prepares)
}

impl<M: ReplicaCommon> Replica<M::PN, M::DN, M::CW, M::S, M::A, M> {
fn have_entered(&self, view_num: u32) -> bool {
self.view_num > view_num || self.view_num == view_num && !self.view_change()
}
}

impl<M: ReplicaCommon> Replica<M::PN, M::DN, M::CW, M::S, M::A, M> {
fn insert_view_change(
&mut self,
view_change: Verifiable<ViewChange>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
if view_change.view_num < self.view_num
|| view_change.view_num == self.view_num && !self.view_change()
{
if self.have_entered(view_change.view_num) {
return Ok(());
}
let view_change_quorum = self.view_changes.entry(view_change.view_num).or_default();
Expand Down Expand Up @@ -1293,10 +1274,7 @@ impl<M: ReplicaCommon> OnEvent<Signed<NewView>> for Replica<M::PN, M::DN, M::CW,
&mut self,
Signed(new_view): Signed<NewView>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
) -> anyhow::Result<()> {
if self.view_num == new_view.view_num {
self.net.send(All, new_view.clone())?;
self.enter_view(new_view, timer)?
Expand All @@ -1311,7 +1289,7 @@ impl<M: ReplicaCommon> Replica<M::PN, M::DN, M::CW, M::S, M::A, M> {
new_view: Verifiable<NewView>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()> {
assert!(new_view.view_num >= self.view_num);
assert!(!self.have_entered(new_view.view_num));
self.view_num = new_view.view_num;
assert!(self.view_change());
for pre_prepare in &new_view.pre_prepares {
Expand Down Expand Up @@ -1379,13 +1357,8 @@ impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<NewView>>>
&mut self,
Recv(new_view): Recv<Verifiable<NewView>>,
_: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
if self.view_num > new_view.view_num
|| self.view_num == new_view.view_num && !self.view_change()
{
) -> anyhow::Result<()> {
if self.have_entered(new_view.view_num) {
return Ok(());
}
let num_replica = self.num_replica;
Expand Down Expand Up @@ -1413,10 +1386,9 @@ impl<M: ReplicaCommon> OnEvent<Recv<Verifiable<NewView>>>
anyhow::Ok(())
};
if do_verify().is_ok() {
sender.send(Verified(new_view))
} else {
Ok(())
sender.send(Verified(new_view))?
}
Ok(())
}))
}
}
Expand All @@ -1426,16 +1398,11 @@ impl<M: ReplicaCommon> OnEvent<Verified<NewView>> for Replica<M::PN, M::DN, M::C
&mut self,
Verified(new_view): Verified<NewView>,
timer: &mut impl Timer<Self>,
) -> anyhow::Result<()>
where
Self: Sized,
{
if self.view_num > new_view.view_num
|| self.view_num == new_view.view_num && !self.view_change()
{
return Ok(());
) -> anyhow::Result<()> {
if !self.have_entered(new_view.view_num) {
self.enter_view(new_view, timer)?
}
self.enter_view(new_view, timer)
Ok(())
}
}

Expand Down Expand Up @@ -1504,7 +1471,7 @@ pub fn to_replica_on_buf<A: Addr>(
buf: &[u8],
sender: &mut impl SendReplicaRecvEvent<A>,
) -> anyhow::Result<()> {
deserialize::<ToReplica<A>>(buf)?.send(sender)
deserialize::<ToReplica<_>>(buf)?.send(sender)
}

#[cfg(test)]
Expand Down

0 comments on commit 761fa27

Please sign in to comment.