Skip to content

Commit

Permalink
p2p: use async bipipe (#853)
Browse files Browse the repository at this point in the history
Closes #841

A copy of the `pipe` crate (https://github.com/arcnmx/pipe-rs) is added to the `tendermint-p2p` crate. I had to add a new method - `async_bipipe_buffered` to make the handshake tests always pass. `async_bipipe_buffered` method uses the `unbounded` channel (rather than the blocking one) for communication - hence the prefix. Note I also removed the methods and structures from `pipe` clone that we're not using at the moment.
  • Loading branch information
melekes authored Apr 14, 2021
1 parent 2609b60 commit 4fa065b
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 3 deletions.
3 changes: 2 additions & 1 deletion p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ description = """
chacha20poly1305 = "0.7"
ed25519-dalek = "1"
eyre = "0.6"
flume = "0.10"
hkdf = "0.10.0"
merlin = "2"
prost = "0.7"
Expand All @@ -39,7 +40,7 @@ prost-amino = { version = "0.6", optional = true }
prost-amino-derive = { version = "0.6", optional = true }

[dev-dependencies]
pipe = { version = "0", features = ["bidirectional"] }
readwrite = "^0.1.1"

[features]
amino = ["prost-amino", "prost-amino-derive"]
7 changes: 5 additions & 2 deletions p2p/src/secret_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ mod nonce;
mod protocol;
mod public_key;

#[cfg(test)]
mod pipe;

/// Size of the MAC tag
pub const TAG_SIZE: usize = 16;

Expand Down Expand Up @@ -515,7 +518,7 @@ mod test {

#[test]
fn test_handshake() {
let (pipe1, pipe2) = pipe::bipipe_buffered();
let (pipe1, pipe2) = pipe::async_bipipe_buffered();

let peer1 = thread::spawn(|| {
let mut csprng = OsRng {};
Expand All @@ -537,7 +540,7 @@ mod test {

#[test]
fn test_read_write_single_message() {
let (pipe1, pipe2) = pipe::bipipe_buffered();
let (pipe1, pipe2) = pipe::async_bipipe_buffered();

const MESSAGE: &str = "The Queen's Gambit";

Expand Down
190 changes: 190 additions & 0 deletions p2p/src/secret_connection/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (c) 2015 arcnmx

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//! Asynchronous in-memory pipe
use flume::{self, Receiver, SendError, Sender, TrySendError};
use std::cmp::min;
use std::io::{self, BufRead, Read, Write};
use std::mem::replace;

// value for libstd
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// The `Read` end of a pipe (see `pipe()`)
pub struct PipeReader {
receiver: Receiver<Vec<u8>>,
buffer: Vec<u8>,
position: usize,
}

/// The `Write` end of a pipe (see `pipe()`) that will buffer small writes before sending
/// to the reader end.
pub struct PipeBufWriter {
sender: Option<Sender<Vec<u8>>>,
buffer: Vec<u8>,
size: usize,
}

/// Creates an asynchronous memory pipe with buffered writer
pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) {
let (tx, rx) = flume::unbounded();

(
PipeReader {
receiver: rx,
buffer: Vec::new(),
position: 0,
},
PipeBufWriter {
sender: Some(tx),
buffer: Vec::with_capacity(DEFAULT_BUF_SIZE),
size: DEFAULT_BUF_SIZE,
},
)
}

/// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`.
pub fn async_bipipe_buffered() -> (
readwrite::ReadWrite<PipeReader, PipeBufWriter>,
readwrite::ReadWrite<PipeReader, PipeBufWriter>,
) {
let (r1, w1) = async_pipe_buffered();
let (r2, w2) = async_pipe_buffered();
((r1, w2).into(), (r2, w1).into())
}

fn epipe() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")
}

impl PipeBufWriter {
#[inline]
/// Gets a reference to the underlying `Sender`
pub fn sender(&self) -> &Sender<Vec<u8>> {
// SAFETY: this is safe as long as `into_inner()` is the only method
// that clears the sender, and this fn is never called afterward
self.sender.as_ref().expect("sender to be present")
}
}

impl BufRead for PipeReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
while self.position >= self.buffer.len() {
match self.receiver.recv() {
// The only existing error is EOF
Err(_) => break,
Ok(data) => {
self.buffer = data;
self.position = 0;
}
}
}

Ok(&self.buffer[self.position..])
}

fn consume(&mut self, amt: usize) {
debug_assert!(self.buffer.len() - self.position >= amt);
self.position += amt
}
}

impl Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}

let internal = self.fill_buf()?;

let len = min(buf.len(), internal.len());
if len > 0 {
buf[..len].copy_from_slice(&internal[..len]);
self.consume(len);
}
Ok(len)
}
}

impl Write for PipeBufWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buffer_len = self.buffer.len();
let bytes_written = if buf.len() > self.size {
// bypass buffering for big writes
buf.len()
} else {
// avoid resizing of the buffer
min(buf.len(), self.size - buffer_len)
};
self.buffer.extend_from_slice(&buf[..bytes_written]);

if self.buffer.len() >= self.size {
self.flush()?;
} else {
// reserve capacity later to avoid needless allocations
let data = replace(&mut self.buffer, Vec::new());

// buffer still has space but try to send it in case the other side already awaits
match self.sender().try_send(data) {
Ok(_) => self.buffer.reserve(self.size),
Err(TrySendError::Full(data)) => self.buffer = data,
Err(TrySendError::Disconnected(data)) => {
self.buffer = data;
self.buffer.truncate(buffer_len);
return Err(epipe());
}
}
}

Ok(bytes_written)
}

fn flush(&mut self) -> io::Result<()> {
if self.buffer.is_empty() {
Ok(())
} else {
let data = replace(&mut self.buffer, Vec::new());
match self.sender().send(data) {
Ok(_) => {
self.buffer.reserve(self.size);
Ok(())
}
Err(SendError(data)) => {
self.buffer = data;
Err(epipe())
}
}
}
}
}

/// Flushes the contents of the buffer before the writer is dropped. Errors are ignored, so it is
/// recommended that `flush()` be used explicitly instead of relying on Drop.
///
/// This final flush can be avoided by using `drop(writer.into_inner())`.
impl Drop for PipeBufWriter {
fn drop(&mut self) {
if !self.buffer.is_empty() {
let data = replace(&mut self.buffer, Vec::new());
let _ = self.sender().send(data);
}
}
}

0 comments on commit 4fa065b

Please sign in to comment.