Skip to content

Commit

Permalink
handle mpsc channel disconnect from peer_write thread (#3241)
Browse files Browse the repository at this point in the history
* handle mpsc channel disconnect from peer_write thread
also actually shutdown the writer when we say we are going to

* fix - we need to break here
  • Loading branch information
antiochp authored Feb 25, 2020
1 parent 6bdeefd commit 6855241
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::util::{RateCounter, RwLock};
use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{mpsc, Arc};
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -362,13 +363,21 @@ where
loop {
let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(CHANNEL_TIMEOUT));
retry_send = Err(());
if let Ok(data) = maybe_data {
let written =
try_break!(write_message(&mut writer, &data, writer_tracker.clone()));
if written.is_none() {
retry_send = Ok(data);
match maybe_data {
Ok(data) => {
let written =
try_break!(write_message(&mut writer, &data, writer_tracker.clone()));
if written.is_none() {
retry_send = Ok(data);
}
}
Err(RecvTimeoutError::Disconnected) => {
debug!("peer_write: mpsc channel disconnected during recv_timeout");
break;
}
Err(RecvTimeoutError::Timeout) => {}
}

// check the close channel
if stopped.load(Ordering::Relaxed) {
break;
Expand All @@ -382,6 +391,7 @@ where
.map(|a| a.to_string())
.unwrap_or_else(|_| "?".to_owned())
);
let _ = writer.shutdown(Shutdown::Both);
})?;
Ok((reader_thread, writer_thread))
}

0 comments on commit 6855241

Please sign in to comment.