diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index c84689e0b1..887dee48e5 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -71,6 +71,11 @@ where self.io.set_flush_pipeline(enabled); } + #[cfg(test)] + pub(crate) fn set_write_strategy_queue(&mut self) { + self.io.set_write_strategy_queue(); + } + pub(crate) fn set_max_buf_size(&mut self, max: usize) { self.io.set_max_buf_size(max); } @@ -461,7 +466,7 @@ where } } match self.state.writing { - Writing::Init => true, + Writing::Init => self.io.can_headers_buf(), _ => false, } } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index d2c1428a75..677131bfdd 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -665,7 +665,6 @@ mod tests { // Block at 0 for now, but we will release this response before // the request is ready to write later... - //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); let (mut tx, rx) = crate::client::dispatch::channel(); let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); @@ -692,6 +691,34 @@ mod tests { }); } + #[tokio::test] + async fn client_flushing_is_not_ready_for_next_request() { + let _ = pretty_env_logger::try_init(); + + let (io, _handle) = tokio_test::io::Builder::new() + .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n") + .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n") + .wait(std::time::Duration::from_secs(2)) + .build_with_handle(); + + let (mut tx, rx) = crate::client::dispatch::channel(); + let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); + conn.set_write_strategy_queue(); + + let dispatcher = Dispatcher::new(Client::new(rx), conn); + let _dispatcher = tokio::spawn(async move { dispatcher.await }); + + let req = crate::Request::builder() + .method("POST") + .body(crate::Body::from("reee")) + .unwrap(); + + let res = tx.try_send(req).unwrap().await.expect("response"); + drop(res); + + assert!(!tx.is_ready()); + } + #[tokio::test] async fn body_empty_chunks_ignored() { let _ = pretty_env_logger::try_init(); diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index db4eece695..a7523001bc 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -98,13 +98,18 @@ where } #[cfg(feature = "server")] - pub(crate) fn set_write_strategy_flatten(&mut self) { + fn set_write_strategy_flatten(&mut self) { // this should always be called only at construction time, // so this assert is here to catch myself debug_assert!(self.write_buf.queue.bufs_cnt() == 0); self.write_buf.set_strategy(WriteStrategy::Flatten); } + #[cfg(test)] + pub(crate) fn set_write_strategy_queue(&mut self) { + self.write_buf.set_strategy(WriteStrategy::Queue); + } + pub(crate) fn read_buf(&self) -> &[u8] { self.read_buf.as_ref() } @@ -121,6 +126,15 @@ where self.read_buf.capacity() - self.read_buf.len() } + /// Return whether we can append to the headers buffer. + /// + /// Reasons we can't: + /// - The write buf is in queue mode, and some of the past body is still + /// needing to be flushed. + pub(crate) fn can_headers_buf(&self) -> bool { + !self.write_buf.queue.has_remaining() + } + pub(crate) fn headers_buf(&mut self) -> &mut Vec { let buf = self.write_buf.headers_mut(); &mut buf.bytes