diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index b7c619683c..3e753c6f6a 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -175,6 +175,11 @@ where } } + #[cfg(feature = "server")] + pub(crate) fn has_initial_read_write_state(&self) -> bool { + matches!(self.state.reading, Reading::Init) && matches!(self.state.writing, Writing::Init) + } + fn should_error_on_eof(&self) -> bool { // If we're idle, it's probably just the connection closing gracefully. T::should_error_on_parse_eof() && !self.state.is_idle() diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index cd494581b9..beb8d0a914 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -28,7 +28,8 @@ pub(crate) trait Dispatch { self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>>; - fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) + -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -81,7 +82,11 @@ where #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { self.conn.disable_keep_alive(); - if self.conn.is_write_closed() { + + // If keep alive has been disabled and no read or write has been seen on + // the connection yet, we must be in a state where the server is being asked to + // shut down before any data has been seen on the connection + if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() { self.close(); } } @@ -249,7 +254,8 @@ where let body = match body_len { DecodedLength::ZERO => IncomingBody::empty(), other => { - let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); + let (tx, rx) = + IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); self.body_tx = Some(tx); rx } diff --git a/tests/server.rs b/tests/server.rs index 632ce4839a..759843e291 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -31,6 +31,7 @@ use hyper::body::{Body, Incoming as IncomingBody}; use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; use hyper::{Method, Request, Response, StatusCode, Uri, Version}; +use tokio::pin; mod support; @@ -2136,6 +2137,31 @@ async fn max_buf_size() { .expect_err("should TooLarge error"); } +#[cfg(feature = "http1")] +#[tokio::test] +async fn graceful_shutdown_before_first_request_no_block() { + let (listener, addr) = setup_tcp_listener(); + + tokio::spawn(async move { + let socket = listener.accept().await.unwrap().0; + + let future = http1::Builder::new().serve_connection(socket, HelloWorld); + pin!(future); + future.as_mut().graceful_shutdown(); + + future.await.unwrap(); + }); + + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + let mut buf = vec![]; + + tokio::time::timeout(Duration::from_secs(5), stream.read_to_end(&mut buf)) + .await + .expect("timed out waiting for graceful shutdown") + .expect("error receiving response"); +} + #[test] fn streaming_body() { use futures_util::StreamExt;