Skip to content

Commit

Permalink
pass stopped into consume so we can halt txhashset mid download (#3157)
Browse files Browse the repository at this point in the history
  • Loading branch information
antiochp authored Dec 5, 2019
1 parent 0b21ee6 commit 8b8f0a0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
13 changes: 11 additions & 2 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);
/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
pub trait MessageHandler: Send + 'static {
fn consume<'a>(&self, msg: Message<'a>, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error>;
fn consume<'a>(
&self,
msg: Message<'a>,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error>;
}

// Macro to simplify the boilerplate around I/O and Grin error handling
Expand Down Expand Up @@ -294,7 +299,11 @@ where
// Increase received bytes counter
reader_tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len);

let resp_msg = try_break!(handler.consume(msg, reader_tracker.clone()));
let resp_msg = try_break!(handler.consume(
msg,
reader_stopped.clone(),
reader_tracker.clone()
));
if let Some(Some(resp_msg)) = resp_msg {
try_break!(conn_handle.send(resp_msg));
}
Expand Down
15 changes: 13 additions & 2 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ impl Protocol {
}

impl MessageHandler for Protocol {
fn consume(&self, mut msg: Message, tracker: Arc<Tracker>) -> Result<Option<Msg>, Error> {
fn consume(
&self,
mut msg: Message,
stopped: Arc<AtomicBool>,
tracker: Arc<Tracker>,
) -> Result<Option<Msg>, Error> {
let adapter = &self.adapter;

// If we received a msg from a banned peer then log and drop it.
Expand Down Expand Up @@ -395,7 +400,13 @@ impl MessageHandler for Protocol {
}
// Increase received bytes quietly (without affecting the counters).
// Otherwise we risk banning a peer as "abusive".
tracker.inc_quiet_received(size as u64)
tracker.inc_quiet_received(size as u64);

// check the close channel
if stopped.load(Ordering::Relaxed) {
debug!("stopping txhashset download early");
return Err(Error::ConnectionClose);
}
}
debug!(
"handle_payload: txhashset archive: {}/{} ... DONE",
Expand Down

0 comments on commit 8b8f0a0

Please sign in to comment.