From 154986d003735de09f43115f59cc62383a6219b2 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 01/10] refactor(http/retry): outline `ReplayBody` unit tests there are more than 500 lines of unit tests. let's move them into a submodule, for convenience. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay.rs | 576 +------------------------ linkerd/http/retry/src/replay/tests.rs | 568 ++++++++++++++++++++++++ 2 files changed, 572 insertions(+), 572 deletions(-) create mode 100644 linkerd/http/retry/src/replay/tests.rs diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 8f397ca7fa..31f6273298 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -7,6 +7,10 @@ use parking_lot::Mutex; use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; use thiserror::Error; +/// Unit tests for [`ReplayBody`]. +#[cfg(test)] +mod tests; + /// Wraps an HTTP body type and lazily buffers data as it is read from the inner /// body. /// @@ -496,575 +500,3 @@ impl BodyState { self.max_bytes == 0 } } - -#[cfg(test)] -mod tests { - use super::*; - use http::HeaderValue; - - #[tokio::test] - async fn replays_one_chunk() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - tx.send_data("hello world").await; - drop(tx); - - { - let (data, trailers) = body_to_string(initial).await; - assert_eq!(data, "hello world"); - assert_eq!(trailers, None); - } - { - let (data, trailers) = body_to_string(replay).await; - assert_eq!(data, "hello world"); - assert_eq!(trailers, None); - } - } - - #[tokio::test] - async fn replays_several_chunks() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_data(", have lots").await; - tx.send_data(" of fun!").await; - }); - - let (initial, trailers) = body_to_string(initial).await; - assert_eq!(initial, "hello world, have lots of fun!"); - assert!(trailers.is_none()); - - let (replay, trailers) = body_to_string(replay).await; - assert_eq!(replay, "hello world, have lots of fun!"); - assert!(trailers.is_none()); - } - - #[tokio::test] - async fn replays_trailers() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_data("hello world").await; - tx.send_trailers(tlrs.clone()).await; - drop(tx); - - let read_trailers = |body: ReplayBody<_>| async move { - let mut body = crate::compat::ForwardCompatibleBody::new(body); - let _ = body - .frame() - .await - .expect("should yield a result") - .expect("should yield a frame") - .into_data() - .expect("should yield data"); - let trls = body - .frame() - .await - .expect("should yield a result") - .expect("should yield a frame") - .into_trailers() - .expect("should yield trailers"); - assert!(body.frame().await.is_none()); - trls - }; - - let initial_tlrs = read_trailers(initial).await; - assert_eq!(&initial_tlrs, &tlrs); - - let replay_tlrs = read_trailers(replay).await; - assert_eq!(&replay_tlrs, &tlrs); - } - - #[tokio::test] - async fn trailers_only() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - let mut initial = crate::compat::ForwardCompatibleBody::new(initial); - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_trailers(tlrs.clone()).await; - - drop(tx); - - let initial_tlrs = initial - .frame() - .await - .expect("should yield a result") - .expect("should yield a frame") - .into_trailers() - .expect("should yield trailers"); - assert_eq!(&initial_tlrs, &tlrs); - - // drop the initial body to send the data to the replay - drop(initial); - - let replay_tlrs = replay - .frame() - .await - .expect("should yield a result") - .expect("should yield a frame") - .into_trailers() - .expect("should yield trailers"); - assert_eq!(&replay_tlrs, &tlrs); - } - - #[tokio::test(flavor = "current_thread")] - async fn switches_with_body_remaining() { - // This simulates a case where the server returns an error _before_ the - // entire body has been read. - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - tx.send_data("hello").await; - assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); - - tx.send_data(" world").await; - assert_eq!(chunk(&mut initial).await.unwrap(), " world"); - - // drop the initial body to send the data to the replay - drop(initial); - tracing::info!("dropped initial body"); - - tokio::spawn(async move { - tx.send_data(", have lots of fun").await; - tx.send_trailers(HeaderMap::new()).await; - }); - - let (data, trailers) = body_to_string(&mut replay).await; - assert_eq!(data, "hello world, have lots of fun"); - assert!(trailers.is_some()); - } - - #[tokio::test(flavor = "current_thread")] - async fn multiple_replays() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_trailers(tlrs2).await; - }); - - let read = |body| async { - let (data, trailers) = body_to_string(body).await; - assert_eq!(data, "hello world"); - assert_eq!(trailers.as_ref(), Some(&tlrs)); - }; - - read(initial).await; - - // Replay the body twice. - let replay2 = replay.clone(); - read(replay).await; - read(replay2).await; - } - - #[tokio::test(flavor = "current_thread")] - async fn multiple_incomplete_replays() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_data("hello").await; - assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); - - // drop the initial body to send the data to the replay - drop(initial); - tracing::info!("dropped initial body"); - - let replay2 = replay.clone(); - - tx.send_data(" world").await; - assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); - assert_eq!(chunk(&mut replay).await.unwrap(), " world"); - - // drop the replay body to send the data to the second replay - drop(replay); - tracing::info!("dropped first replay body"); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data(", have lots").await; - tx.send_data(" of fun!").await; - tx.send_trailers(tlrs2).await; - }); - - let (data, replay2_trailers) = body_to_string(replay2).await; - assert_eq!(data, "hello world, have lots of fun!"); - assert_eq!(replay2_trailers.as_ref(), Some(&tlrs)); - } - - #[tokio::test(flavor = "current_thread")] - async fn drop_clone_early() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_trailers(tlrs2).await; - }); - - { - let body = initial; - let (data, trailers) = body_to_string(body).await; - assert_eq!(data, "hello world"); - assert_eq!(trailers.as_ref(), Some(&tlrs)); - } - - // Clone the body, and then drop it before polling it. - let replay2 = replay.clone(); - drop(replay2); - - { - let body = replay; - let (data, trailers) = body_to_string(body).await; - assert_eq!(data, "hello world"); - assert_eq!(trailers.as_ref(), Some(&tlrs)); - } - } - - // This test is specifically for behavior across clones, so the clippy lint - // is wrong here. - #[allow(clippy::redundant_clone)] - #[test] - fn empty_body_is_always_eos() { - // If the initial body was empty, every clone should always return - // `true` from `is_end_stream`. - let initial = ReplayBody::try_new(BoxBody::empty(), 64 * 1024) - .expect("empty body can't be too large"); - assert!(initial.is_end_stream()); - - let replay = initial.clone(); - assert!(replay.is_end_stream()); - - let replay2 = replay.clone(); - assert!(replay2.is_end_stream()); - } - - #[tokio::test(flavor = "current_thread")] - async fn eos_only_when_fully_replayed() { - // Test that each clone of a body is not EOS until the data has been - // fully replayed. - let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) - .expect("body must not be too large"); - let replay = initial.clone(); - - let mut initial = crate::compat::ForwardCompatibleBody::new(initial); - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - - // Read the initial body, show that the replay does not consider itself to have reached the - // end-of-stream. Then drop the initial body, show that the replay is still not done. - assert!(!initial.is_end_stream()); - initial - .frame() - .await - .expect("yields a result") - .expect("yields a frame") - .into_data() - .expect("yields a data frame"); - assert!(initial.is_end_stream()); - assert!(!replay.is_end_stream()); - drop(initial); - assert!(!replay.is_end_stream()); - - // Read the replay body. - assert!(!replay.is_end_stream()); - replay - .frame() - .await - .expect("yields a result") - .expect("yields a frame") - .into_data() - .expect("yields a data frame"); - assert!(replay.frame().await.is_none()); - assert!(replay.is_end_stream()); - - // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. - let replay = replay.into_inner(); - let replay2 = replay.clone(); - assert!(!replay2.is_end_stream()); - - // Drop the first replay body to send the data to the second replay. - drop(replay); - - // Read the second replay body. - let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); - replay2 - .frame() - .await - .expect("yields a result") - .expect("yields a frame") - .into_data() - .expect("yields a data frame"); - assert!(replay2.frame().await.is_none()); - assert!(replay2.is_end_stream()); - } - - #[tokio::test(flavor = "current_thread")] - async fn caps_buffer() { - // Test that, when the initial body is longer than the preconfigured - // cap, we allow the request to continue, but stop buffering. The - // initial body will complete, but the replay will immediately fail. - let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); - - // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via - // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. - // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. - let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); - let replay = initial.clone(); - - // Send enough data to reach the cap - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); - - // Further chunks are still forwarded on the initial body - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); - - drop(initial); - - // The request's replay should error, since we discarded the buffer when - // we hit the cap. - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - let err = replay - .frame() - .await - .expect("yields a result") - .expect_err("yields an error when capped"); - assert!(err.is::()) - } - - #[tokio::test(flavor = "current_thread")] - async fn caps_across_replays() { - // Test that, when the initial body is longer than the preconfigured - // cap, we allow the request to continue, but stop buffering. - let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); - - // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via - // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. - // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. - let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); - let mut replay = initial.clone(); - - // Send enough data to reach the cap - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); - drop(initial); - - let replay2 = replay.clone(); - - // The replay will reach the cap, but it should still return data from - // the original body. - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); - assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); - drop(replay); - - // The second replay will fail, though, because the buffer was discarded. - let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); - let err = replay2 - .frame() - .await - .expect("yields a result") - .expect_err("yields an error when capped"); - assert!(err.is::()) - } - - #[test] - fn body_too_big() { - let max_size = 8; - let mk_body = |sz: usize| -> BoxBody { - let s = (0..sz).map(|_| "x").collect::(); - BoxBody::new(s) - }; - - assert!( - ReplayBody::try_new(BoxBody::empty(), max_size).is_ok(), - "empty body is not too big" - ); - - assert!( - ReplayBody::try_new(mk_body(max_size), max_size).is_ok(), - "body at maximum capacity is not too big" - ); - - assert!( - ReplayBody::try_new(mk_body(max_size + 1), max_size).is_err(), - "over-sized body is too big" - ); - - // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via - // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. - // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. - let (_sender, body) = hyper::Body::channel(); - assert!( - ReplayBody::try_new(body, max_size).is_ok(), - "body without size hint is not too big" - ); - } - - struct Test { - // Sends body data. - tx: Tx, - /// The "initial" body. - initial: ReplayBody, - /// Replays the initial body. - replay: ReplayBody, - /// An RAII guard for the tracing subscriber. - _trace: tracing::subscriber::DefaultGuard, - } - - struct Tx(hyper::body::Sender); - - impl Test { - fn new() -> Self { - // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via - // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. - // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. - let (tx, rx) = hyper::Body::channel(); - let initial = ReplayBody::try_new(BoxBody::new(rx), 64 * 1024).expect("body too large"); - let replay = initial.clone(); - Self { - tx: Tx(tx), - initial, - replay, - _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug").0, - } - } - } - - impl Tx { - #[tracing::instrument(skip(self))] - async fn send_data(&mut self, data: impl Into + std::fmt::Debug) { - let data = data.into(); - tracing::trace!("sending data..."); - self.0.send_data(data).await.expect("rx is not dropped"); - tracing::info!("sent data"); - } - - #[tracing::instrument(skip(self))] - async fn send_trailers(&mut self, trailers: HeaderMap) { - tracing::trace!("sending trailers..."); - self.0 - .send_trailers(trailers) - .await - .expect("rx is not dropped"); - tracing::info!("sent trailers"); - } - } - - async fn chunk(body: &mut T) -> Option - where - T: http_body::Body + Unpin, - { - tracing::trace!("waiting for a body chunk..."); - let chunk = crate::compat::ForwardCompatibleBody::new(body) - .frame() - .await - .expect("yields a result") - .ok() - .expect("yields a frame") - .into_data() - .ok() - .map(string); - tracing::info!(?chunk); - chunk - } - - async fn body_to_string(body: B) -> (String, Option) - where - B: http_body::Body + Unpin, - B::Error: std::fmt::Debug, - { - let mut body = crate::compat::ForwardCompatibleBody::new(body); - let mut data = String::new(); - let mut trailers = None; - - // Continue reading frames from the body until it is finished. - while let Some(frame) = body - .frame() - .await - .transpose() - .expect("reading a frame succeeds") - { - match frame.into_data().map(string) { - Ok(ref s) => data.push_str(s), - Err(frame) => { - let trls = frame - .into_trailers() - .map_err(drop) - .expect("test frame is either data or trailers"); - trailers = Some(trls); - } - } - } - - tracing::info!(?data, ?trailers, "finished reading body"); - (data, trailers) - } - - fn string(mut data: impl Buf) -> String { - let bytes = data.copy_to_bytes(data.remaining()); - String::from_utf8(bytes.to_vec()).unwrap() - } -} diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs new file mode 100644 index 0000000000..a2abfbdf44 --- /dev/null +++ b/linkerd/http/retry/src/replay/tests.rs @@ -0,0 +1,568 @@ +use super::*; +use http::HeaderValue; + +#[tokio::test] +async fn replays_one_chunk() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + tx.send_data("hello world").await; + drop(tx); + + { + let (data, trailers) = body_to_string(initial).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } + { + let (data, trailers) = body_to_string(replay).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } +} + +#[tokio::test] +async fn replays_several_chunks() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_data(", have lots").await; + tx.send_data(" of fun!").await; + }); + + let (initial, trailers) = body_to_string(initial).await; + assert_eq!(initial, "hello world, have lots of fun!"); + assert!(trailers.is_none()); + + let (replay, trailers) = body_to_string(replay).await; + assert_eq!(replay, "hello world, have lots of fun!"); + assert!(trailers.is_none()); +} + +#[tokio::test] +async fn replays_trailers() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_data("hello world").await; + tx.send_trailers(tlrs.clone()).await; + drop(tx); + + let read_trailers = |body: ReplayBody<_>| async move { + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let _ = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_data() + .expect("should yield data"); + let trls = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert!(body.frame().await.is_none()); + trls + }; + + let initial_tlrs = read_trailers(initial).await; + assert_eq!(&initial_tlrs, &tlrs); + + let replay_tlrs = read_trailers(replay).await; + assert_eq!(&replay_tlrs, &tlrs); +} + +#[tokio::test] +async fn trailers_only() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_trailers(tlrs.clone()).await; + + drop(tx); + + let initial_tlrs = initial + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&initial_tlrs, &tlrs); + + // drop the initial body to send the data to the replay + drop(initial); + + let replay_tlrs = replay + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&replay_tlrs, &tlrs); +} + +#[tokio::test(flavor = "current_thread")] +async fn switches_with_body_remaining() { + // This simulates a case where the server returns an error _before_ the + // entire body has been read. + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + tx.send_data("hello").await; + assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); + + tx.send_data(" world").await; + assert_eq!(chunk(&mut initial).await.unwrap(), " world"); + + // drop the initial body to send the data to the replay + drop(initial); + tracing::info!("dropped initial body"); + + tokio::spawn(async move { + tx.send_data(", have lots of fun").await; + tx.send_trailers(HeaderMap::new()).await; + }); + + let (data, trailers) = body_to_string(&mut replay).await; + assert_eq!(data, "hello world, have lots of fun"); + assert!(trailers.is_some()); +} + +#[tokio::test(flavor = "current_thread")] +async fn multiple_replays() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_trailers(tlrs2).await; + }); + + let read = |body| async { + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + }; + + read(initial).await; + + // Replay the body twice. + let replay2 = replay.clone(); + read(replay).await; + read(replay2).await; +} + +#[tokio::test(flavor = "current_thread")] +async fn multiple_incomplete_replays() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_data("hello").await; + assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); + + // drop the initial body to send the data to the replay + drop(initial); + tracing::info!("dropped initial body"); + + let replay2 = replay.clone(); + + tx.send_data(" world").await; + assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); + assert_eq!(chunk(&mut replay).await.unwrap(), " world"); + + // drop the replay body to send the data to the second replay + drop(replay); + tracing::info!("dropped first replay body"); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data(", have lots").await; + tx.send_data(" of fun!").await; + tx.send_trailers(tlrs2).await; + }); + + let (data, replay2_trailers) = body_to_string(replay2).await; + assert_eq!(data, "hello world, have lots of fun!"); + assert_eq!(replay2_trailers.as_ref(), Some(&tlrs)); +} + +#[tokio::test(flavor = "current_thread")] +async fn drop_clone_early() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_trailers(tlrs2).await; + }); + + { + let body = initial; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } + + // Clone the body, and then drop it before polling it. + let replay2 = replay.clone(); + drop(replay2); + + { + let body = replay; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } +} + +// This test is specifically for behavior across clones, so the clippy lint +// is wrong here. +#[allow(clippy::redundant_clone)] +#[test] +fn empty_body_is_always_eos() { + // If the initial body was empty, every clone should always return + // `true` from `is_end_stream`. + let initial = + ReplayBody::try_new(BoxBody::empty(), 64 * 1024).expect("empty body can't be too large"); + assert!(initial.is_end_stream()); + + let replay = initial.clone(); + assert!(replay.is_end_stream()); + + let replay2 = replay.clone(); + assert!(replay2.is_end_stream()); +} + +#[tokio::test(flavor = "current_thread")] +async fn eos_only_when_fully_replayed() { + // Test that each clone of a body is not EOS until the data has been + // fully replayed. + let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) + .expect("body must not be too large"); + let replay = initial.clone(); + + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + + // Read the initial body, show that the replay does not consider itself to have reached the + // end-of-stream. Then drop the initial body, show that the replay is still not done. + assert!(!initial.is_end_stream()); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(initial.is_end_stream()); + assert!(!replay.is_end_stream()); + drop(initial); + assert!(!replay.is_end_stream()); + + // Read the replay body. + assert!(!replay.is_end_stream()); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay.frame().await.is_none()); + assert!(replay.is_end_stream()); + + // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. + let replay = replay.into_inner(); + let replay2 = replay.clone(); + assert!(!replay2.is_end_stream()); + + // Drop the first replay body to send the data to the second replay. + drop(replay); + + // Read the second replay body. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay2.frame().await.is_none()); + assert!(replay2.is_end_stream()); +} + +#[tokio::test(flavor = "current_thread")] +async fn caps_buffer() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. The + // initial body will complete, but the replay will immediately fail. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); + + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); + let replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + + // Further chunks are still forwarded on the initial body + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); + + drop(initial); + + // The request's replay should error, since we discarded the buffer when + // we hit the cap. + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + let err = replay + .frame() + .await + .expect("yields a result") + .expect_err("yields an error when capped"); + assert!(err.is::()) +} + +#[tokio::test(flavor = "current_thread")] +async fn caps_across_replays() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); + + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); + let mut replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + drop(initial); + + let replay2 = replay.clone(); + + // The replay will reach the cap, but it should still return data from + // the original body. + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); + assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); + drop(replay); + + // The second replay will fail, though, because the buffer was discarded. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + let err = replay2 + .frame() + .await + .expect("yields a result") + .expect_err("yields an error when capped"); + assert!(err.is::()) +} + +#[test] +fn body_too_big() { + let max_size = 8; + let mk_body = |sz: usize| -> BoxBody { + let s = (0..sz).map(|_| "x").collect::(); + BoxBody::new(s) + }; + + assert!( + ReplayBody::try_new(BoxBody::empty(), max_size).is_ok(), + "empty body is not too big" + ); + + assert!( + ReplayBody::try_new(mk_body(max_size), max_size).is_ok(), + "body at maximum capacity is not too big" + ); + + assert!( + ReplayBody::try_new(mk_body(max_size + 1), max_size).is_err(), + "over-sized body is too big" + ); + + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. + let (_sender, body) = hyper::Body::channel(); + assert!( + ReplayBody::try_new(body, max_size).is_ok(), + "body without size hint is not too big" + ); +} + +struct Test { + // Sends body data. + tx: Tx, + /// The "initial" body. + initial: ReplayBody, + /// Replays the initial body. + replay: ReplayBody, + /// An RAII guard for the tracing subscriber. + _trace: tracing::subscriber::DefaultGuard, +} + +struct Tx(hyper::body::Sender); + +impl Test { + fn new() -> Self { + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. + let (tx, rx) = hyper::Body::channel(); + let initial = ReplayBody::try_new(BoxBody::new(rx), 64 * 1024).expect("body too large"); + let replay = initial.clone(); + Self { + tx: Tx(tx), + initial, + replay, + _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug").0, + } + } +} + +impl Tx { + #[tracing::instrument(skip(self))] + async fn send_data(&mut self, data: impl Into + std::fmt::Debug) { + let data = data.into(); + tracing::trace!("sending data..."); + self.0.send_data(data).await.expect("rx is not dropped"); + tracing::info!("sent data"); + } + + #[tracing::instrument(skip(self))] + async fn send_trailers(&mut self, trailers: HeaderMap) { + tracing::trace!("sending trailers..."); + self.0 + .send_trailers(trailers) + .await + .expect("rx is not dropped"); + tracing::info!("sent trailers"); + } +} + +async fn chunk(body: &mut T) -> Option +where + T: http_body::Body + Unpin, +{ + tracing::trace!("waiting for a body chunk..."); + let chunk = crate::compat::ForwardCompatibleBody::new(body) + .frame() + .await + .expect("yields a result") + .ok() + .expect("yields a frame") + .into_data() + .ok() + .map(string); + tracing::info!(?chunk); + chunk +} + +async fn body_to_string(body: B) -> (String, Option) +where + B: http_body::Body + Unpin, + B::Error: std::fmt::Debug, +{ + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut data = String::new(); + let mut trailers = None; + + // Continue reading frames from the body until it is finished. + while let Some(frame) = body + .frame() + .await + .transpose() + .expect("reading a frame succeeds") + { + match frame.into_data().map(string) { + Ok(ref s) => data.push_str(s), + Err(frame) => { + let trls = frame + .into_trailers() + .map_err(drop) + .expect("test frame is either data or trailers"); + trailers = Some(trls); + } + } + } + + tracing::info!(?data, ?trailers, "finished reading body"); + (data, trailers) +} + +fn string(mut data: impl Buf) -> String { + let bytes = data.copy_to_bytes(data.remaining()); + String::from_utf8(bytes.to_vec()).unwrap() +} From b28bdb14718c0b43462c2c043cd3967245a22d9c Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 02/10] nit(http/retry): reorganize replay tests this is a small cosmetic change reording some test helpers. there is a common convention of affixing a banner comment above groups of `impl T {}` blocks, which is useful when top-level blocks are folded in an editor. similarly, there is a convention of defining structures at the top of a file. this commit reorganizes the replay body tests to follow each of these conventions. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 30 +++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index a2abfbdf44..31eb9728d0 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -1,6 +1,19 @@ use super::*; use http::HeaderValue; +struct Test { + // Sends body data. + tx: Tx, + /// The "initial" body. + initial: ReplayBody, + /// Replays the initial body. + replay: ReplayBody, + /// An RAII guard for the tracing subscriber. + _trace: tracing::subscriber::DefaultGuard, +} + +struct Tx(hyper::body::Sender); + #[tokio::test] async fn replays_one_chunk() { let Test { @@ -462,18 +475,7 @@ fn body_too_big() { ); } -struct Test { - // Sends body data. - tx: Tx, - /// The "initial" body. - initial: ReplayBody, - /// Replays the initial body. - replay: ReplayBody, - /// An RAII guard for the tracing subscriber. - _trace: tracing::subscriber::DefaultGuard, -} - -struct Tx(hyper::body::Sender); +// === impl Test === impl Test { fn new() -> Self { @@ -492,6 +494,8 @@ impl Test { } } +// === impl Tx === + impl Tx { #[tracing::instrument(skip(self))] async fn send_data(&mut self, data: impl Into + std::fmt::Debug) { @@ -512,6 +516,8 @@ impl Tx { } } +// === helper functions === + async fn chunk(body: &mut T) -> Option where T: http_body::Body + Unpin, From f70482c02ace56c7a5fbb471f7aa61920952d64d Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 03/10] nit(http/retry): test replays trailers twice just to be extra sure! Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 31eb9728d0..e20d5f4a76 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -70,6 +70,7 @@ async fn replays_trailers() { replay, _trace, } = Test::new(); + let replay2 = replay.clone(); let mut tlrs = HeaderMap::new(); tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); @@ -104,6 +105,9 @@ async fn replays_trailers() { let replay_tlrs = read_trailers(replay).await; assert_eq!(&replay_tlrs, &tlrs); + + let replay_tlrs = read_trailers(replay2).await; + assert_eq!(&replay_tlrs, &tlrs); } #[tokio::test] From 21db80ec4bf91b1c243d22eab17258bc8d2fae41 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 04/10] nit(http/retry): rename `trailers_only()` test this is part of a family of other tests called `replays_one_chunk()`, `replays_several_chunks()`, and `replays_trailers()`. let's name this something that lines up with this convention. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index e20d5f4a76..ccb8767a40 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -111,7 +111,7 @@ async fn replays_trailers() { } #[tokio::test] -async fn trailers_only() { +async fn replays_trailers_only() { let Test { mut tx, initial, From a7032cff9f45c4a2b1ea0a3f4283bc766d2d54c7 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 05/10] feat(http/retry): add a `TestBody` type we have a unit test, called `eos_only_when_fully_replayed` that confirms `Body::end_of_stream()` reports the stream ending properly. soon, we aim to introduce additional test coverage that exercises this when a body has trailers, as well. this will be useful for assurance related to upgrading to http-body v1.x. see linkerd/linkerd2#8733 for more information. unfortunately, hyper 0.14's channel-backed body does not report itself as having reached the end of the stream. this is an unfortunate quality that prevents us from using `Test::new()`. this commit adds a `TestBody` type that we can use in place of `BoxBody::from_static(..)`, which boxes a static string, but does not send trailers. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 49 +++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index ccb8767a40..536e5466be 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -1,5 +1,7 @@ use super::*; +use bytes::Bytes; use http::HeaderValue; +use std::collections::VecDeque; struct Test { // Sends body data. @@ -12,6 +14,12 @@ struct Test { _trace: tracing::subscriber::DefaultGuard, } +#[derive(Debug, Default)] +struct TestBody { + data: VecDeque<&'static str>, + trailers: Option, +} + struct Tx(hyper::body::Sender); #[tokio::test] @@ -319,7 +327,7 @@ fn empty_body_is_always_eos() { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) + let initial = ReplayBody::try_new(TestBody::one_data_frame(), 64 * 1024) .expect("body must not be too large"); let replay = initial.clone(); @@ -520,6 +528,45 @@ impl Tx { } } +// === impl TestBody === + +impl TestBody { + /// A body that yields a single DATA frame. + fn one_data_frame() -> Self { + Self { + data: ["one"].into(), + trailers: None, + } + } +} + +impl Body for TestBody { + type Data = ::Data; + type Error = std::convert::Infallible; + fn poll_data( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { + let Self { data, .. } = self.get_mut(); + let next = data.pop_front().map(Bytes::from).map(Ok); + Poll::Ready(next) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let Self { trailers, .. } = self.get_mut(); + let trailers = trailers.take().map(Ok).transpose(); + Poll::Ready(trailers) + } + + fn is_end_stream(&self) -> bool { + let Self { data, trailers } = self; + data.is_empty() && trailers.is_none() + } +} + // === helper functions === async fn chunk(body: &mut T) -> Option From 564b5f91773ad9d586b9db4529783c0166ead214 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 06/10] feat(http/retry): add `is_end_stream()` coverage for trailers this commit introduces additional test coverage that exercises `is_end_stream()` when a replay body is wrapping a body with trailers. this will be useful for assurance related to upgrading to http-body v1.x. see linkerd/linkerd2#8733 for more information. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 95 +++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 536e5466be..54bb46e831 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -1,6 +1,6 @@ use super::*; use bytes::Bytes; -use http::HeaderValue; +use http::{HeaderName, HeaderValue}; use std::collections::VecDeque; struct Test { @@ -382,6 +382,87 @@ async fn eos_only_when_fully_replayed() { assert!(replay2.is_end_stream()); } +#[tokio::test(flavor = "current_thread")] +async fn eos_only_when_fully_replayed_with_trailers() { + // Test that each clone of a body is not EOS until the data has been + // fully replayed. + let initial = ReplayBody::try_new(TestBody::one_data_frame().with_trailers(), 64 * 1024) + .expect("body must not be too large"); + let replay = initial.clone(); + + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + + // Read the initial body, show that the replay does not consider itself to have reached the + // end-of-stream. Then drop the initial body, show that the replay is still not done. + assert!(!initial.is_end_stream()); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_trailers() + .map_err(drop) + .expect("yields a trailers frame"); + assert!(initial.is_end_stream()); + assert!(!replay.is_end_stream()); + drop(initial); + assert!(!replay.is_end_stream()); + + // Read the replay body. + assert!(!replay.is_end_stream()); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_trailers() + .map_err(drop) + .expect("yields a trailers frame"); + assert!(replay.is_end_stream()); + + // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. + let replay = replay.into_inner(); + let replay2 = replay.clone(); + assert!(!replay2.is_end_stream()); + + // Drop the first replay body to send the data to the second replay. + drop(replay); + + // Read the second replay body. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_trailers() + .map_err(drop) + .expect("yields a trailers frame"); + assert!(replay2.is_end_stream()); +} + #[tokio::test(flavor = "current_thread")] async fn caps_buffer() { // Test that, when the initial body is longer than the preconfigured @@ -538,6 +619,18 @@ impl TestBody { trailers: None, } } + + /// Adds a TRAILERS frame to the body. + fn with_trailers(self) -> Self { + let name = HeaderName::from_static("name"); + let value = HeaderValue::from_static("value"); + let trailers = [(name, value)].into_iter().collect(); + + Self { + trailers: Some(trailers), + ..self + } + } } impl Body for TestBody { From ef76acedc908459ba09573a2c07eb2ce1e9819a5 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 07/10] feat(http/retry): add `is_capped()` test coverage Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 54bb46e831..1e2a0f99a6 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -477,14 +477,23 @@ async fn caps_buffer() { let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); let replay = initial.clone(); - // Send enough data to reach the cap + // The initial body isn't capped yet, and the replay body is waiting. + assert_eq!(initial.is_capped(), Some(false)); + assert_eq!(replay.is_capped(), None); + + // Send enough data to reach the cap, but do not exceed it. tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + assert_eq!(initial.is_capped(), Some(false)); // Further chunks are still forwarded on the initial body tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); + // The initial body has been capped now. + assert_eq!(initial.is_capped(), Some(true)); + assert_eq!(replay.is_capped(), None); + drop(initial); // The request's replay should error, since we discarded the buffer when @@ -495,7 +504,8 @@ async fn caps_buffer() { .await .expect("yields a result") .expect_err("yields an error when capped"); - assert!(err.is::()) + assert!(err.is::()); + assert_eq!(replay.into_inner().is_capped(), Some(true)); } #[tokio::test(flavor = "current_thread")] @@ -522,7 +532,9 @@ async fn caps_across_replays() { // the original body. tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); + assert_eq!(replay.is_capped(), Some(false)); assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); + assert_eq!(replay.is_capped(), Some(true)); drop(replay); // The second replay will fail, though, because the buffer was discarded. From dc91c0cf962516edd044c200a693ccb7db8d1063 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 08/10] feat(http/retry): further refine capacity test coverage we want to show that exceeding the capacity is the point at which replays will fail. this commit defines some constants to further communicate and encode this relationship between the bytes sent, and the capacity of the replay body. further, it shortens the second frame sent so that we ensure precisely when a body becomes capped. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 40 ++++++++++++++++---------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 1e2a0f99a6..10b1eff4eb 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -465,6 +465,11 @@ async fn eos_only_when_fully_replayed_with_trailers() { #[tokio::test(flavor = "current_thread")] async fn caps_buffer() { + const CAPACITY: usize = 8; + const FILL: Bytes = Bytes::from_static(b"abcdefgh"); + const OVERFLOW: Bytes = Bytes::from_static(b"i"); + debug_assert!(FILL.len() == CAPACITY, "fills the body's capacity"); + // Test that, when the initial body is longer than the preconfigured // cap, we allow the request to continue, but stop buffering. The // initial body will complete, but the replay will immediately fail. @@ -474,7 +479,8 @@ async fn caps_buffer() { // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); + let mut initial = + ReplayBody::try_new(body, CAPACITY).expect("channel body must not be too large"); let replay = initial.clone(); // The initial body isn't capped yet, and the replay body is waiting. @@ -482,15 +488,13 @@ async fn caps_buffer() { assert_eq!(replay.is_capped(), None); // Send enough data to reach the cap, but do not exceed it. - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + tx.send_data(FILL).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("abcdefgh".to_string())); assert_eq!(initial.is_capped(), Some(false)); - // Further chunks are still forwarded on the initial body - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); - - // The initial body has been capped now. + // Any more bytes sent to the initial body exceeds its capacity. + tx.send_data(OVERFLOW).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("i".to_string())); assert_eq!(initial.is_capped(), Some(true)); assert_eq!(replay.is_capped(), None); @@ -510,6 +514,11 @@ async fn caps_buffer() { #[tokio::test(flavor = "current_thread")] async fn caps_across_replays() { + const CAPACITY: usize = 8; + const FILL: Bytes = Bytes::from_static(b"abcdefgh"); + const OVERFLOW: Bytes = Bytes::from_static(b"i"); + debug_assert!(FILL.len() == CAPACITY, "fills the body's capacity"); + // Test that, when the initial body is longer than the preconfigured // cap, we allow the request to continue, but stop buffering. let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); @@ -518,22 +527,23 @@ async fn caps_across_replays() { // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); + let mut initial = + ReplayBody::try_new(body, CAPACITY).expect("channel body must not be too large"); let mut replay = initial.clone(); - // Send enough data to reach the cap - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + // Send enough data to reach the cap, but do not exceed it. + tx.send_data(FILL).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("abcdefgh".to_string())); drop(initial); let replay2 = replay.clone(); // The replay will reach the cap, but it should still return data from // the original body. - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); + tx.send_data(OVERFLOW).await.unwrap(); + assert_eq!(chunk(&mut replay).await, Some("abcdefgh".to_string())); assert_eq!(replay.is_capped(), Some(false)); - assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); + assert_eq!(chunk(&mut replay).await, Some("i".to_string())); assert_eq!(replay.is_capped(), Some(true)); drop(replay); From 8dda02d374844300ab56fa4407c0995f0b563913 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 09/10] feat(http/retry): add `size_hint()` test coverage Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 66 ++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 10b1eff4eb..834138e159 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -590,6 +590,72 @@ fn body_too_big() { ); } +// This test is specifically for behavior across clones, so the clippy lint +// is wrong here. +#[allow(clippy::redundant_clone)] +#[test] +fn size_hint_is_correct_for_empty_body() { + let initial = + ReplayBody::try_new(BoxBody::empty(), 64 * 1024).expect("empty body can't be too large"); + let size = initial.size_hint(); + assert_eq!(size.lower(), 0); + assert_eq!(size.upper(), Some(0)); + + let replay = initial.clone(); + let size = replay.size_hint(); + assert_eq!(size.lower(), 0); + assert_eq!(size.upper(), Some(0)); +} + +#[tokio::test(flavor = "current_thread")] +async fn size_hint_is_correct_across_replays() { + const CAPACITY: usize = 8; + const BODY: &str = "contents"; + const SIZE: u64 = BODY.len() as u64; + debug_assert!(SIZE as usize <= CAPACITY); + + // Create the initial body, and a replay. + let mut initial = ReplayBody::try_new(BoxBody::from_static(BODY), CAPACITY) + .expect("empty body can't be too large"); + let mut replay = initial.clone(); + + // Show that the body reports a proper size hint. + let initial_size = initial.size_hint(); + assert_eq!(initial_size.lower(), SIZE); + assert_eq!(initial_size.exact(), Some(SIZE)); + assert_eq!(initial_size.upper(), Some(SIZE)); + + // Read the body, check the size hint again. + assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY)); + debug_assert!(initial.is_end_stream()); + // TODO(kate): this currently misreports the *remaining* size of the body. + // let size = initial.size_hint(); + // assert_eq!(size.lower(), 0); + // assert_eq!(size.upper(), Some(0)); + + // The replay reports the initial size hint, before and after dropping the initial body. + let size = replay.size_hint(); + assert_eq!(size.lower(), initial_size.lower()); + assert_eq!(size.upper(), initial_size.upper()); + drop(initial); + let size = replay.size_hint(); + assert_eq!(size.lower(), initial_size.lower()); + assert_eq!(size.upper(), initial_size.upper()); + + // Drop the initial body, read the replay and check its size hint. + assert_eq!(chunk(&mut replay).await.as_deref(), Some(BODY)); + // let replay = { + // // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers. + // let mut body = crate::compat::ForwardCompatibleBody::new(replay); + // assert!(body.frame().await.is_none()); + // body.into_inner() + // }; + // let size = replay.size_hint(); + // debug_assert!(replay.is_end_stream()); + // assert_eq!(size.lower(), 0); + // assert_eq!(size.upper(), Some(0)); +} + // === impl Test === impl Test { From 50c2d6f788d64d581c3739d39c4287de97598e3f Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 28 Jan 2025 00:00:00 +0000 Subject: [PATCH 10/10] chore(http/retry): add todo comments concerning eos for now, a replaying body that will not yield trailers must be polled to the `None` before reporting itself as reaching the end of the stream. this isn't hugely important, but does affect some test control flow. leave two todo comments so that if/when upgrading to hyper 1.0, it is clear that these are not load-bearing or otherwise expected behavior, should this behavior be rectified. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay/tests.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 834138e159..90c0140188 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -358,6 +358,7 @@ async fn eos_only_when_fully_replayed() { .expect("yields a frame") .into_data() .expect("yields a data frame"); + // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers. assert!(replay.frame().await.is_none()); assert!(replay.is_end_stream()); @@ -378,6 +379,7 @@ async fn eos_only_when_fully_replayed() { .expect("yields a frame") .into_data() .expect("yields a data frame"); + // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers. assert!(replay2.frame().await.is_none()); assert!(replay2.is_end_stream()); }