diff --git a/linkerd/app/integration/src/tests/telemetry/log_stream.rs b/linkerd/app/integration/src/tests/telemetry/log_stream.rs index 0e2e962cdf..40eeee3008 100644 --- a/linkerd/app/integration/src/tests/telemetry/log_stream.rs +++ b/linkerd/app/integration/src/tests/telemetry/log_stream.rs @@ -210,17 +210,26 @@ async fn query_log_stream( /// Spawns a task to collect all the logs in a streaming body and parse them as /// JSON. -fn collect_logs( - mut body: hyper::Body, -) -> (JoinHandle>, oneshot::Sender<()>) { +fn collect_logs(body: B) -> (JoinHandle>, oneshot::Sender<()>) +where + B: Body + Send + Unpin + 'static, + B::Error: std::error::Error, +{ + let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body); let (done_tx, done_rx) = oneshot::channel(); let result = tokio::spawn(async move { let mut result = Vec::new(); let logs = &mut result; let fut = async move { - while let Some(res) = body.data().await { + while let Some(res) = body.frame().await { let chunk = match res { - Ok(chunk) => chunk, + Ok(frame) => { + if let Ok(data) = frame.into_data() { + data + } else { + break; + } + } Err(e) => { println!("body failed: {}", e); break;