Skip to content

Commit

Permalink
fix(http1): http1 server graceful shutdown fix
Browse files Browse the repository at this point in the history
fix issue in the graceful shutdown logic
which causes the connection future to hang
when graceful shutdown is called prior to any
requests being  made. This fix checks to see
if the connection is still in its initial state
when disable_keep_alive is called, and starts the
shutdown process if it is.

This addresses issue hyperium#2730
  • Loading branch information
Robin Seitz authored and rob2244 committed May 23, 2023
1 parent 16a921f commit 8978e10
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ where
}
}

#[cfg(feature = "server")]
pub(crate) fn has_initial_read_write_state(&self) -> bool {
matches!(self.state.reading, Reading::Init) && matches!(self.state.writing, Writing::Init)
}

fn should_error_on_eof(&self) -> bool {
// If we're idle, it's probably just the connection closing gracefully.
T::should_error_on_parse_eof() && !self.state.is_idle()
Expand Down
12 changes: 9 additions & 3 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub(crate) trait Dispatch {
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
-> crate::Result<()>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
fn should_poll(&self) -> bool;
}
Expand Down Expand Up @@ -81,7 +82,11 @@ where
#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive();
if self.conn.is_write_closed() {

// If keep alive has been disabled and no read or write has been seen on
// the connection yet, we must be in a state where the server is being asked to
// shut down before any data has been seen on the connection
if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
self.close();
}
}
Expand Down Expand Up @@ -249,7 +254,8 @@ where
let body = match body_len {
DecodedLength::ZERO => IncomingBody::empty(),
other => {
let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
let (tx, rx) =
IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
self.body_tx = Some(tx);
rx
}
Expand Down
26 changes: 26 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use hyper::body::{Body, Incoming as IncomingBody};
use hyper::server::conn::{http1, http2};
use hyper::service::{service_fn, Service};
use hyper::{Method, Request, Response, StatusCode, Uri, Version};
use tokio::pin;

mod support;

Expand Down Expand Up @@ -2136,6 +2137,31 @@ async fn max_buf_size() {
.expect_err("should TooLarge error");
}

#[cfg(feature = "http1")]
#[tokio::test]
async fn graceful_shutdown_before_first_request_no_block() {
let (listener, addr) = setup_tcp_listener();

tokio::spawn(async move {
let socket = listener.accept().await.unwrap().0;

let future = http1::Builder::new().serve_connection(socket, HelloWorld);
pin!(future);
future.as_mut().graceful_shutdown();

future.await.unwrap();
});

let mut stream = TkTcpStream::connect(addr).await.unwrap();

let mut buf = vec![];

tokio::time::timeout(Duration::from_secs(5), stream.read_to_end(&mut buf))
.await
.expect("timed out waiting for graceful shutdown")
.expect("error receiving response");
}

#[test]
fn streaming_body() {
use futures_util::StreamExt;
Expand Down

0 comments on commit 8978e10

Please sign in to comment.