From 1b081220efc501eebe37216ca3de3060058bb5d1 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 4 Mar 2025 00:00:00 +0000 Subject: [PATCH] refactor(app/integration): forward-compatible test code see https://github.com/linkerd/linkerd2/issues/8733 for more information. see https://github.com/linkerd/linkerd2-proxy/pull/3559 and https://github.com/linkerd/linkerd2-proxy/pull/3614 for more information on the `ForwardCompatibleBody` wrapper. `telemetry::log_stream::collect_logs` is a function responsible for digesting a streaming body, and deserializing each chunk into a `serde_json::Value`, until either (a) a shutdown signal is received, or (b) the end of the body is reached. this commit updates test code in `linkerd-app-integration` so that it interacts with request and response bodies via an adapter that polls for frames in a manner consistent with the 1.0 api of `http_body`. this allows us to limit the diff in https://github.com/linkerd/linkerd2-proxy/pull/3504, which will only need to remove this adapter once using hyper 1.0. * https://github.com/linkerd/linkerd2/issues/8733 * https://github.com/linkerd/linkerd2-proxy/pull/3671 * https://github.com/linkerd/linkerd2-proxy/pull/3672 * https://github.com/linkerd/linkerd2-proxy/pull/3673 * https://github.com/linkerd/linkerd2-proxy/pull/3676 Signed-off-by: katelyn martin --- .../src/tests/telemetry/log_stream.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/linkerd/app/integration/src/tests/telemetry/log_stream.rs b/linkerd/app/integration/src/tests/telemetry/log_stream.rs index 0e2e962cdf..40eeee3008 100644 --- a/linkerd/app/integration/src/tests/telemetry/log_stream.rs +++ b/linkerd/app/integration/src/tests/telemetry/log_stream.rs @@ -210,17 +210,26 @@ async fn query_log_stream( /// Spawns a task to collect all the logs in a streaming body and parse them as /// JSON. -fn collect_logs( - mut body: hyper::Body, -) -> (JoinHandle>, oneshot::Sender<()>) { +fn collect_logs(body: B) -> (JoinHandle>, oneshot::Sender<()>) +where + B: Body + Send + Unpin + 'static, + B::Error: std::error::Error, +{ + let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body); let (done_tx, done_rx) = oneshot::channel(); let result = tokio::spawn(async move { let mut result = Vec::new(); let logs = &mut result; let fut = async move { - while let Some(res) = body.data().await { + while let Some(res) = body.frame().await { let chunk = match res { - Ok(chunk) => chunk, + Ok(frame) => { + if let Ok(data) = frame.into_data() { + data + } else { + break; + } + } Err(e) => { println!("body failed: {}", e); break;