From 592e989d77a9616bcd064a742bc498c8cd5b5eef Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 31 Mar 2021 14:35:37 +0400 Subject: [PATCH 1/8] p2p: use async bipipe Closes #841 I had to clone the original crate to introduce 4 additional methods: 1. async_pipe 2. async_pipe_buffered 3. async_bipipe 4. async_bipipe_buffered The difference is they all use unbounded `crossbeam` channel rather then `bounded(0)` which makes them async. And we need pipe to be async because of how the handshake is written currently. --- p2p/Cargo.toml | 2 +- p2p/src/secret_connection.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 5ef012d1c..42934afed 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -39,7 +39,7 @@ prost-amino = { version = "0.6", optional = true } prost-amino-derive = { version = "0.6", optional = true } [dev-dependencies] -pipe = { version = "0", features = ["bidirectional"] } +pipe = { version = "0", features = ["bidirectional"], git = "https://github.com/melekes/pipe-rs" } [features] amino = ["prost-amino", "prost-amino-derive"] diff --git a/p2p/src/secret_connection.rs b/p2p/src/secret_connection.rs index 4cdfbee67..869d44b06 100644 --- a/p2p/src/secret_connection.rs +++ b/p2p/src/secret_connection.rs @@ -515,7 +515,7 @@ mod test { #[test] fn test_handshake() { - let (pipe1, pipe2) = pipe::bipipe_buffered(); + let (pipe1, pipe2) = pipe::async_bipipe_buffered(); let peer1 = thread::spawn(|| { let mut csprng = OsRng {}; @@ -537,7 +537,7 @@ mod test { #[test] fn test_read_write_single_message() { - let (pipe1, pipe2) = pipe::bipipe_buffered(); + let (pipe1, pipe2) = pipe::async_bipipe_buffered(); const MESSAGE: &str = "The Queen's Gambit"; From 3f2ca1850ab0bff3da9fb019bb2e19a40cc76507 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 7 Apr 2021 10:36:31 +0400 Subject: [PATCH 2/8] copy pipe-rs from https://github.com/arcnmx/pipe-rs --- p2p/Cargo.toml | 3 +- p2p/src/lib.rs | 2 +- p2p/src/secret_connection.rs | 3 + p2p/src/secret_connection/pipe.rs | 525 ++++++++++++++++++++++++++++++ 4 files changed, 531 insertions(+), 2 deletions(-) create mode 100644 p2p/src/secret_connection/pipe.rs diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 42934afed..4f9a8160a 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -39,7 +39,8 @@ prost-amino = { version = "0.6", optional = true } prost-amino-derive = { version = "0.6", optional = true } [dev-dependencies] -pipe = { version = "0", features = ["bidirectional"], git = "https://github.com/melekes/pipe-rs" } +crossbeam-channel = "^0.5.0" +readwrite = "^0.1.1" [features] amino = ["prost-amino", "prost-amino-derive"] diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 8ad619d42..19f7903cc 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,6 +1,6 @@ //! The Tendermint P2P stack. -#![forbid(unsafe_code)] +#![deny(unsafe_code)] #![deny( trivial_casts, trivial_numeric_casts, diff --git a/p2p/src/secret_connection.rs b/p2p/src/secret_connection.rs index 869d44b06..5d9024a01 100644 --- a/p2p/src/secret_connection.rs +++ b/p2p/src/secret_connection.rs @@ -32,6 +32,9 @@ mod nonce; mod protocol; mod public_key; +#[cfg(test)] +mod pipe; + /// Size of the MAC tag pub const TAG_SIZE: usize = 16; diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs new file mode 100644 index 000000000..af5413473 --- /dev/null +++ b/p2p/src/secret_connection/pipe.rs @@ -0,0 +1,525 @@ +// Copyright (c) 2015 arcnmx + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#![allow(unsafe_code)] + +//! Synchronous in-memory pipe +//! +//! ## Example +//! +//! ``` +//! use std::thread::spawn; +//! use std::io::{Read, Write}; +//! +//! let (mut read, mut write) = pipe::pipe(); +//! +//! let message = "Hello, world!"; +//! spawn(move || write.write_all(message.as_bytes()).unwrap()); +//! +//! let mut s = String::new(); +//! read.read_to_string(&mut s).unwrap(); +//! +//! assert_eq!(&s, message); +//! ``` + +use crossbeam_channel; +use readwrite; + +use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; +use std::cmp::min; +use std::hint::unreachable_unchecked; +use std::io::{self, BufRead, Read, Write}; +use std::mem::replace; + +// value for libstd +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +/// The `Read` end of a pipe (see `pipe()`) +pub struct PipeReader { + receiver: Receiver>, + buffer: Vec, + position: usize, +} + +/// The `Write` end of a pipe (see `pipe()`) +#[derive(Clone)] +pub struct PipeWriter { + sender: Sender>, +} + +/// The `Write` end of a pipe (see `pipe()`) that will buffer small writes before sending +/// to the reader end. +pub struct PipeBufWriter { + sender: Option>>, + buffer: Vec, + size: usize, +} + +/// Creates a synchronous memory pipe +pub fn pipe() -> (PipeReader, PipeWriter) { + let (sender, receiver) = crossbeam_channel::bounded(0); + + ( + PipeReader { + receiver, + buffer: Vec::new(), + position: 0, + }, + PipeWriter { sender }, + ) +} + +/// Creates a synchronous memory pipe with buffered writer +pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) { + let (tx, rx) = crossbeam_channel::bounded(0); + + ( + PipeReader { + receiver: rx, + buffer: Vec::new(), + position: 0, + }, + PipeBufWriter { + sender: Some(tx), + buffer: Vec::with_capacity(DEFAULT_BUF_SIZE), + size: DEFAULT_BUF_SIZE, + }, + ) +} + +/// Creates an asynchronous memory pipe +pub fn async_pipe() -> (PipeReader, PipeWriter) { + let (sender, receiver) = crossbeam_channel::unbounded(); + + ( + PipeReader { + receiver, + buffer: Vec::new(), + position: 0, + }, + PipeWriter { sender }, + ) +} + +/// Creates an asynchronous memory pipe with buffered writer +pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) { + let (tx, rx) = crossbeam_channel::unbounded(); + + ( + PipeReader { + receiver: rx, + buffer: Vec::new(), + position: 0, + }, + PipeBufWriter { + sender: Some(tx), + buffer: Vec::with_capacity(DEFAULT_BUF_SIZE), + size: DEFAULT_BUF_SIZE, + }, + ) +} + +/// Creates a pair of pipes for bidirectional communication, a bit like UNIX's `socketpair(2)`. +pub fn bipipe() -> ( + readwrite::ReadWrite, + readwrite::ReadWrite, +) { + let (r1, w1) = pipe(); + let (r2, w2) = pipe(); + ((r1, w2).into(), (r2, w1).into()) +} + +/// Creates a pair of pipes for bidirectional communication, a bit like UNIX's `socketpair(2)`. +pub fn async_bipipe() -> ( + readwrite::ReadWrite, + readwrite::ReadWrite, +) { + let (r1, w1) = async_pipe(); + let (r2, w2) = async_pipe(); + ((r1, w2).into(), (r2, w1).into()) +} + +/// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`. +pub fn bipipe_buffered() -> ( + readwrite::ReadWrite, + readwrite::ReadWrite, +) { + let (r1, w1) = pipe_buffered(); + let (r2, w2) = pipe_buffered(); + ((r1, w2).into(), (r2, w1).into()) +} + +/// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`. +pub fn async_bipipe_buffered() -> ( + readwrite::ReadWrite, + readwrite::ReadWrite, +) { + let (r1, w1) = async_pipe_buffered(); + let (r2, w2) = async_pipe_buffered(); + ((r1, w2).into(), (r2, w1).into()) +} + +fn epipe() -> io::Error { + io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped") +} + +impl PipeWriter { + /// Extracts the inner `Sender` from the writer + pub fn into_inner(self) -> Sender> { + self.sender + } + + /// Gets a reference to the underlying `Sender` + pub fn sender(&self) -> &Sender> { + &self.sender + } + + /// Write data to the associated `PipeReader` + pub fn send>>(&self, bytes: B) -> io::Result<()> { + self.sender + .send(bytes.into()) + .map_err(|_| epipe()) + .map(drop) + } +} + +impl PipeBufWriter { + /// Extracts the inner `Sender` from the writer, and any pending buffered data + pub fn into_inner(mut self) -> (Sender>, Vec) { + let sender = match replace(&mut self.sender, None) { + Some(sender) => sender, + None => unsafe { + // SAFETY: this is safe as long as `into_inner()` is the only method + // that clears the sender + unreachable_unchecked() + }, + }; + (sender, replace(&mut self.buffer, Vec::new())) + } + + #[inline] + /// Gets a reference to the underlying `Sender` + pub fn sender(&self) -> &Sender> { + match &self.sender { + Some(sender) => sender, + None => unsafe { + // SAFETY: this is safe as long as `into_inner()` is the only method + // that clears the sender, and this fn is never called afterward + unreachable_unchecked() + }, + } + } + + /// Returns a reference to the internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buffer + } + + /// Returns the number of bytes the internal buffer can hold without flushing. + pub fn capacity(&self) -> usize { + self.size + } +} + +/// Creates a new handle to the `PipeBufWriter` with a fresh new buffer. Any pending data is still +/// owned by the existing writer and should be flushed if necessary. +impl Clone for PipeBufWriter { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + buffer: Vec::with_capacity(self.size), + size: self.size, + } + } +} + +impl PipeReader { + /// Extracts the inner `Receiver` from the writer, and any pending buffered data + pub fn into_inner(mut self) -> (Receiver>, Vec) { + self.buffer.drain(..self.position); + (self.receiver, self.buffer) + } + + /// Returns a reference to the internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buffer[self.position..] + } +} + +/// Creates a new handle to the `PipeReader` with a fresh new buffer. Any pending data is still +/// owned by the existing reader and will not be accessible from the new handle. +impl Clone for PipeReader { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.clone(), + buffer: Vec::new(), + position: 0, + } + } +} + +impl BufRead for PipeReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + while self.position >= self.buffer.len() { + match self.receiver.recv() { + // The only existing error is EOF + Err(_) => break, + Ok(data) => { + self.buffer = data; + self.position = 0; + } + } + } + + Ok(&self.buffer[self.position..]) + } + + fn consume(&mut self, amt: usize) { + debug_assert!(self.buffer.len() - self.position >= amt); + self.position += amt + } +} + +impl Read for PipeReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let internal = self.fill_buf()?; + + let len = min(buf.len(), internal.len()); + if len > 0 { + buf[..len].copy_from_slice(&internal[..len]); + self.consume(len); + } + Ok(len) + } +} + +impl Write for &'_ PipeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let data = buf.to_vec(); + + self.send(data).map(|_| buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Write for PipeWriter { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + Write::write(&mut &*self, buf) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Write::flush(&mut &*self) + } +} + +impl Write for PipeBufWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let buffer_len = self.buffer.len(); + let bytes_written = if buf.len() > self.size { + // bypass buffering for big writes + buf.len() + } else { + // avoid resizing of the buffer + min(buf.len(), self.size - buffer_len) + }; + self.buffer.extend_from_slice(&buf[..bytes_written]); + + if self.buffer.len() >= self.size { + self.flush()?; + } else { + // reserve capacity later to avoid needless allocations + let data = replace(&mut self.buffer, Vec::new()); + + // buffer still has space but try to send it in case the other side already awaits + match self.sender().try_send(data) { + Ok(_) => self.buffer.reserve(self.size), + Err(TrySendError::Full(data)) => self.buffer = data, + Err(TrySendError::Disconnected(data)) => { + self.buffer = data; + self.buffer.truncate(buffer_len); + return Err(epipe()); + } + } + } + + Ok(bytes_written) + } + + fn flush(&mut self) -> io::Result<()> { + if self.buffer.is_empty() { + Ok(()) + } else { + let data = replace(&mut self.buffer, Vec::new()); + match self.sender().send(data) { + Ok(_) => { + self.buffer.reserve(self.size); + Ok(()) + } + Err(SendError(data)) => { + self.buffer = data; + Err(epipe()) + } + } + } + } +} + +/// Flushes the contents of the buffer before the writer is dropped. Errors are ignored, so it is +/// recommended that `flush()` be used explicitly instead of relying on Drop. +/// +/// This final flush can be avoided by using `drop(writer.into_inner())`. +impl Drop for PipeBufWriter { + fn drop(&mut self) { + if !self.buffer.is_empty() { + let data = replace(&mut self.buffer, Vec::new()); + let _ = self.sender().send(data); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::thread::spawn; + + #[test] + fn pipe_reader() { + let i = b"hello there"; + let mut o = Vec::with_capacity(i.len()); + let (mut r, mut w) = pipe(); + let guard = spawn(move || { + w.write_all(&i[..5]).unwrap(); + w.write_all(&i[5..]).unwrap(); + drop(w); + }); + + r.read_to_end(&mut o).unwrap(); + assert_eq!(i, &o[..]); + + guard.join().unwrap(); + } + + #[test] + fn pipe_writer_fail() { + let i = b"hi"; + let (r, mut w) = pipe(); + let guard = spawn(move || { + drop(r); + }); + + assert!(w.write_all(i).is_err()); + + guard.join().unwrap(); + } + + #[test] + fn small_reads() { + let block_cnt = 20; + const BLOCK: usize = 20; + let (mut r, mut w) = pipe(); + let guard = spawn(move || { + for _ in 0..block_cnt { + let data = &[0; BLOCK]; + w.write_all(data).unwrap(); + } + }); + + let mut buff = [0; BLOCK / 2]; + let mut read = 0; + while let Ok(size) = r.read(&mut buff) { + // 0 means EOF + if size == 0 { + break; + } + read += size; + } + assert_eq!(block_cnt * BLOCK, read); + + guard.join().unwrap(); + } + + #[test] + fn pipe_reader_buffered() { + let i = b"hello there"; + let mut o = Vec::with_capacity(i.len()); + let (mut r, mut w) = pipe_buffered(); + let guard = spawn(move || { + w.write_all(&i[..5]).unwrap(); + w.write_all(&i[5..]).unwrap(); + w.flush().unwrap(); + drop(w); + }); + + r.read_to_end(&mut o).unwrap(); + assert_eq!(i, &o[..]); + + guard.join().unwrap(); + } + + #[test] + fn pipe_writer_fail_buffered() { + let i = &[0; DEFAULT_BUF_SIZE * 2]; + let (r, mut w) = pipe_buffered(); + let guard = spawn(move || { + drop(r); + }); + + assert!(w.write_all(i).is_err()); + + guard.join().unwrap(); + } + + #[test] + fn small_reads_buffered() { + let block_cnt = 20; + const BLOCK: usize = 20; + let (mut r, mut w) = pipe_buffered(); + let guard = spawn(move || { + for _ in 0..block_cnt { + let data = &[0; BLOCK]; + w.write_all(data).unwrap(); + } + w.flush().unwrap(); + }); + + let mut buff = [0; BLOCK / 2]; + let mut read = 0; + while let Ok(size) = r.read(&mut buff) { + // 0 means EOF + if size == 0 { + break; + } + read += size; + } + assert_eq!(block_cnt * BLOCK, read); + + guard.join().unwrap(); + } +} From 9b7fac4e4fd720e90b76a2c7a72ebfa9ac629c9b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 7 Apr 2021 10:47:19 +0400 Subject: [PATCH 3/8] cleanup unused functions --- p2p/src/secret_connection/pipe.rs | 80 ------------------------------- 1 file changed, 80 deletions(-) diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index af5413473..f4b737642 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -104,20 +104,6 @@ pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) { ) } -/// Creates an asynchronous memory pipe -pub fn async_pipe() -> (PipeReader, PipeWriter) { - let (sender, receiver) = crossbeam_channel::unbounded(); - - ( - PipeReader { - receiver, - buffer: Vec::new(), - position: 0, - }, - PipeWriter { sender }, - ) -} - /// Creates an asynchronous memory pipe with buffered writer pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) { let (tx, rx) = crossbeam_channel::unbounded(); @@ -136,36 +122,6 @@ pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) { ) } -/// Creates a pair of pipes for bidirectional communication, a bit like UNIX's `socketpair(2)`. -pub fn bipipe() -> ( - readwrite::ReadWrite, - readwrite::ReadWrite, -) { - let (r1, w1) = pipe(); - let (r2, w2) = pipe(); - ((r1, w2).into(), (r2, w1).into()) -} - -/// Creates a pair of pipes for bidirectional communication, a bit like UNIX's `socketpair(2)`. -pub fn async_bipipe() -> ( - readwrite::ReadWrite, - readwrite::ReadWrite, -) { - let (r1, w1) = async_pipe(); - let (r2, w2) = async_pipe(); - ((r1, w2).into(), (r2, w1).into()) -} - -/// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`. -pub fn bipipe_buffered() -> ( - readwrite::ReadWrite, - readwrite::ReadWrite, -) { - let (r1, w1) = pipe_buffered(); - let (r2, w2) = pipe_buffered(); - ((r1, w2).into(), (r2, w1).into()) -} - /// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`. pub fn async_bipipe_buffered() -> ( readwrite::ReadWrite, @@ -201,19 +157,6 @@ impl PipeWriter { } impl PipeBufWriter { - /// Extracts the inner `Sender` from the writer, and any pending buffered data - pub fn into_inner(mut self) -> (Sender>, Vec) { - let sender = match replace(&mut self.sender, None) { - Some(sender) => sender, - None => unsafe { - // SAFETY: this is safe as long as `into_inner()` is the only method - // that clears the sender - unreachable_unchecked() - }, - }; - (sender, replace(&mut self.buffer, Vec::new())) - } - #[inline] /// Gets a reference to the underlying `Sender` pub fn sender(&self) -> &Sender> { @@ -226,16 +169,6 @@ impl PipeBufWriter { }, } } - - /// Returns a reference to the internally buffered data. - pub fn buffer(&self) -> &[u8] { - &self.buffer - } - - /// Returns the number of bytes the internal buffer can hold without flushing. - pub fn capacity(&self) -> usize { - self.size - } } /// Creates a new handle to the `PipeBufWriter` with a fresh new buffer. Any pending data is still @@ -250,19 +183,6 @@ impl Clone for PipeBufWriter { } } -impl PipeReader { - /// Extracts the inner `Receiver` from the writer, and any pending buffered data - pub fn into_inner(mut self) -> (Receiver>, Vec) { - self.buffer.drain(..self.position); - (self.receiver, self.buffer) - } - - /// Returns a reference to the internally buffered data. - pub fn buffer(&self) -> &[u8] { - &self.buffer[self.position..] - } -} - /// Creates a new handle to the `PipeReader` with a fresh new buffer. Any pending data is still /// owned by the existing reader and will not be accessible from the new handle. impl Clone for PipeReader { From 32bbdf3c417c4f8b95c9b1c1ffcd6dfdf845b52c Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Apr 2021 21:01:05 +0400 Subject: [PATCH 4/8] remove unsafe code and use flume instead of crossbeam_channel --- p2p/Cargo.toml | 2 +- p2p/src/lib.rs | 2 +- p2p/src/secret_connection/pipe.rs | 21 +++++---------------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 4f9a8160a..348f7db34 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -19,6 +19,7 @@ description = """ chacha20poly1305 = "0.7" ed25519-dalek = "1" eyre = "0.6" +flume = "0.10" hkdf = "0.10.0" merlin = "2" prost = "0.7" @@ -39,7 +40,6 @@ prost-amino = { version = "0.6", optional = true } prost-amino-derive = { version = "0.6", optional = true } [dev-dependencies] -crossbeam-channel = "^0.5.0" readwrite = "^0.1.1" [features] diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 19f7903cc..8ad619d42 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,6 +1,6 @@ //! The Tendermint P2P stack. -#![deny(unsafe_code)] +#![forbid(unsafe_code)] #![deny( trivial_casts, trivial_numeric_casts, diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index f4b737642..f54d1e17b 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -18,8 +18,6 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -#![allow(unsafe_code)] - //! Synchronous in-memory pipe //! //! ## Example @@ -39,12 +37,10 @@ //! assert_eq!(&s, message); //! ``` -use crossbeam_channel; use readwrite; -use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; +use flume::{self, Receiver, SendError, Sender, TrySendError}; use std::cmp::min; -use std::hint::unreachable_unchecked; use std::io::{self, BufRead, Read, Write}; use std::mem::replace; @@ -74,7 +70,7 @@ pub struct PipeBufWriter { /// Creates a synchronous memory pipe pub fn pipe() -> (PipeReader, PipeWriter) { - let (sender, receiver) = crossbeam_channel::bounded(0); + let (sender, receiver) = flume::bounded(0); ( PipeReader { @@ -88,7 +84,7 @@ pub fn pipe() -> (PipeReader, PipeWriter) { /// Creates a synchronous memory pipe with buffered writer pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) { - let (tx, rx) = crossbeam_channel::bounded(0); + let (tx, rx) = flume::bounded(0); ( PipeReader { @@ -106,7 +102,7 @@ pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) { /// Creates an asynchronous memory pipe with buffered writer pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = flume::unbounded(); ( PipeReader { @@ -160,14 +156,7 @@ impl PipeBufWriter { #[inline] /// Gets a reference to the underlying `Sender` pub fn sender(&self) -> &Sender> { - match &self.sender { - Some(sender) => sender, - None => unsafe { - // SAFETY: this is safe as long as `into_inner()` is the only method - // that clears the sender, and this fn is never called afterward - unreachable_unchecked() - }, - } + self.sender.as_ref().unwrap() } } From ef500a67dcd7e2da00e775f8c5bbd6b9ccf0edf7 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Apr 2021 21:06:24 +0400 Subject: [PATCH 5/8] remove pipe and pipe_buffered methods we only need async_pipe_buffered --- p2p/src/secret_connection/pipe.rs | 172 +----------------------------- 1 file changed, 1 insertion(+), 171 deletions(-) diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index f54d1e17b..6188a34f9 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -18,24 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//! Synchronous in-memory pipe -//! -//! ## Example -//! -//! ``` -//! use std::thread::spawn; -//! use std::io::{Read, Write}; -//! -//! let (mut read, mut write) = pipe::pipe(); -//! -//! let message = "Hello, world!"; -//! spawn(move || write.write_all(message.as_bytes()).unwrap()); -//! -//! let mut s = String::new(); -//! read.read_to_string(&mut s).unwrap(); -//! -//! assert_eq!(&s, message); -//! ``` +//! Asynchronous in-memory pipe use readwrite; @@ -68,38 +51,6 @@ pub struct PipeBufWriter { size: usize, } -/// Creates a synchronous memory pipe -pub fn pipe() -> (PipeReader, PipeWriter) { - let (sender, receiver) = flume::bounded(0); - - ( - PipeReader { - receiver, - buffer: Vec::new(), - position: 0, - }, - PipeWriter { sender }, - ) -} - -/// Creates a synchronous memory pipe with buffered writer -pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) { - let (tx, rx) = flume::bounded(0); - - ( - PipeReader { - receiver: rx, - buffer: Vec::new(), - position: 0, - }, - PipeBufWriter { - sender: Some(tx), - buffer: Vec::with_capacity(DEFAULT_BUF_SIZE), - size: DEFAULT_BUF_SIZE, - }, - ) -} - /// Creates an asynchronous memory pipe with buffered writer pub fn async_pipe_buffered() -> (PipeReader, PipeBufWriter) { let (tx, rx) = flume::unbounded(); @@ -311,124 +262,3 @@ impl Drop for PipeBufWriter { } } } - -#[cfg(test)] -mod tests { - use super::*; - use std::io::{Read, Write}; - use std::thread::spawn; - - #[test] - fn pipe_reader() { - let i = b"hello there"; - let mut o = Vec::with_capacity(i.len()); - let (mut r, mut w) = pipe(); - let guard = spawn(move || { - w.write_all(&i[..5]).unwrap(); - w.write_all(&i[5..]).unwrap(); - drop(w); - }); - - r.read_to_end(&mut o).unwrap(); - assert_eq!(i, &o[..]); - - guard.join().unwrap(); - } - - #[test] - fn pipe_writer_fail() { - let i = b"hi"; - let (r, mut w) = pipe(); - let guard = spawn(move || { - drop(r); - }); - - assert!(w.write_all(i).is_err()); - - guard.join().unwrap(); - } - - #[test] - fn small_reads() { - let block_cnt = 20; - const BLOCK: usize = 20; - let (mut r, mut w) = pipe(); - let guard = spawn(move || { - for _ in 0..block_cnt { - let data = &[0; BLOCK]; - w.write_all(data).unwrap(); - } - }); - - let mut buff = [0; BLOCK / 2]; - let mut read = 0; - while let Ok(size) = r.read(&mut buff) { - // 0 means EOF - if size == 0 { - break; - } - read += size; - } - assert_eq!(block_cnt * BLOCK, read); - - guard.join().unwrap(); - } - - #[test] - fn pipe_reader_buffered() { - let i = b"hello there"; - let mut o = Vec::with_capacity(i.len()); - let (mut r, mut w) = pipe_buffered(); - let guard = spawn(move || { - w.write_all(&i[..5]).unwrap(); - w.write_all(&i[5..]).unwrap(); - w.flush().unwrap(); - drop(w); - }); - - r.read_to_end(&mut o).unwrap(); - assert_eq!(i, &o[..]); - - guard.join().unwrap(); - } - - #[test] - fn pipe_writer_fail_buffered() { - let i = &[0; DEFAULT_BUF_SIZE * 2]; - let (r, mut w) = pipe_buffered(); - let guard = spawn(move || { - drop(r); - }); - - assert!(w.write_all(i).is_err()); - - guard.join().unwrap(); - } - - #[test] - fn small_reads_buffered() { - let block_cnt = 20; - const BLOCK: usize = 20; - let (mut r, mut w) = pipe_buffered(); - let guard = spawn(move || { - for _ in 0..block_cnt { - let data = &[0; BLOCK]; - w.write_all(data).unwrap(); - } - w.flush().unwrap(); - }); - - let mut buff = [0; BLOCK / 2]; - let mut read = 0; - while let Ok(size) = r.read(&mut buff) { - // 0 means EOF - if size == 0 { - break; - } - read += size; - } - assert_eq!(block_cnt * BLOCK, read); - - guard.join().unwrap(); - } -} From 736d1726e7469155d4f18615e05ac2982fa28c6d Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Apr 2021 21:13:35 +0400 Subject: [PATCH 6/8] remove PipeWriter --- p2p/src/secret_connection/pipe.rs | 74 ------------------------------- 1 file changed, 74 deletions(-) diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index 6188a34f9..82f6e8664 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -37,12 +37,6 @@ pub struct PipeReader { position: usize, } -/// The `Write` end of a pipe (see `pipe()`) -#[derive(Clone)] -pub struct PipeWriter { - sender: Sender>, -} - /// The `Write` end of a pipe (see `pipe()`) that will buffer small writes before sending /// to the reader end. pub struct PipeBufWriter { @@ -83,26 +77,6 @@ fn epipe() -> io::Error { io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped") } -impl PipeWriter { - /// Extracts the inner `Sender` from the writer - pub fn into_inner(self) -> Sender> { - self.sender - } - - /// Gets a reference to the underlying `Sender` - pub fn sender(&self) -> &Sender> { - &self.sender - } - - /// Write data to the associated `PipeReader` - pub fn send>>(&self, bytes: B) -> io::Result<()> { - self.sender - .send(bytes.into()) - .map_err(|_| epipe()) - .map(drop) - } -} - impl PipeBufWriter { #[inline] /// Gets a reference to the underlying `Sender` @@ -111,30 +85,6 @@ impl PipeBufWriter { } } -/// Creates a new handle to the `PipeBufWriter` with a fresh new buffer. Any pending data is still -/// owned by the existing writer and should be flushed if necessary. -impl Clone for PipeBufWriter { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - buffer: Vec::with_capacity(self.size), - size: self.size, - } - } -} - -/// Creates a new handle to the `PipeReader` with a fresh new buffer. Any pending data is still -/// owned by the existing reader and will not be accessible from the new handle. -impl Clone for PipeReader { - fn clone(&self) -> Self { - Self { - receiver: self.receiver.clone(), - buffer: Vec::new(), - position: 0, - } - } -} - impl BufRead for PipeReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { while self.position >= self.buffer.len() { @@ -174,30 +124,6 @@ impl Read for PipeReader { } } -impl Write for &'_ PipeWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let data = buf.to_vec(); - - self.send(data).map(|_| buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Write for PipeWriter { - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - Write::write(&mut &*self, buf) - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - Write::flush(&mut &*self) - } -} - impl Write for PipeBufWriter { fn write(&mut self, buf: &[u8]) -> io::Result { let buffer_len = self.buffer.len(); From 04a1234c8daf5e98b555285b7c18ea53c66a65db Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Apr 2021 21:14:35 +0400 Subject: [PATCH 7/8] fix clippy warning --- p2p/src/secret_connection/pipe.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index 82f6e8664..8090a58ff 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -20,8 +20,6 @@ //! Asynchronous in-memory pipe -use readwrite; - use flume::{self, Receiver, SendError, Sender, TrySendError}; use std::cmp::min; use std::io::{self, BufRead, Read, Write}; From 047e909495dcfdef45ccfd092ad1953291918577 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 13 Apr 2021 11:24:46 +0400 Subject: [PATCH 8/8] bring back safety comment --- p2p/src/secret_connection/pipe.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/src/secret_connection/pipe.rs b/p2p/src/secret_connection/pipe.rs index 8090a58ff..d40dd9529 100644 --- a/p2p/src/secret_connection/pipe.rs +++ b/p2p/src/secret_connection/pipe.rs @@ -79,7 +79,9 @@ impl PipeBufWriter { #[inline] /// Gets a reference to the underlying `Sender` pub fn sender(&self) -> &Sender> { - self.sender.as_ref().unwrap() + // SAFETY: this is safe as long as `into_inner()` is the only method + // that clears the sender, and this fn is never called afterward + self.sender.as_ref().expect("sender to be present") } }