Skip to content

Commit

Permalink
feat(proxy): optimizing the chances of large write in copy_bidirectional
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jan 31, 2025
1 parent afbcebe commit 57633ce
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions proxy/src/proxy/copy_bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,26 @@ impl CopyBuffer {
W: AsyncWrite + ?Sized,
{
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
self.pos = 0;
self.cap = 0;

// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
if self.cap < self.buf.len() && !self.read_done {
match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(ErrorDirection::Read(err))),
Poll::Pending => {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))
.map_err(ErrorDirection::Write)?;
self.need_flush = false;
// Ignore pending reads when our buffer is not empty, because
// we can try to write data immediately.
if self.pos == self.cap {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))
.map_err(ErrorDirection::Write)?;
self.need_flush = false;
}

return Poll::Pending;
}

return Poll::Pending;
}
}
}
Expand All @@ -246,9 +247,13 @@ impl CopyBuffer {
"writer returned length larger than input slice"
);

// All data has been written, the buffer can be considered empty again
self.pos = 0;
self.cap = 0;

// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
if self.read_done {
ready!(writer.as_mut().poll_flush(cx)).map_err(ErrorDirection::Write)?;
return Poll::Ready(Ok(self.amt));
}
Expand Down

0 comments on commit 57633ce

Please sign in to comment.