Skip to content

Commit

Permalink
refactor(app/integration): forward-compatible test code
Browse files Browse the repository at this point in the history
see linkerd/linkerd2#8733 for more
information.

see #3559 and
#3614 for more information
on the `ForwardCompatibleBody<B>` wrapper.

`telemetry::log_stream::collect_logs` is a function responsible for
digesting a streaming body, and deserializing each chunk into a
`serde_json::Value`, until either (a) a shutdown signal is received, or
(b) the end of the body is reached.

this commit updates test code in `linkerd-app-integration` so that it
interacts with request and response bodies via an adapter that polls for
frames in a manner consistent with the 1.0 api of `http_body`.

this allows us to limit the diff in
#3504, which will only
need to remove this adapter once using hyper 1.0.

* linkerd/linkerd2#8733
* #3671
* #3672
* #3673
* #3676

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Mar 4, 2025
1 parent 4e3119c commit 1b08122
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions linkerd/app/integration/src/tests/telemetry/log_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,26 @@ async fn query_log_stream(

/// Spawns a task to collect all the logs in a streaming body and parse them as
/// JSON.
fn collect_logs(
mut body: hyper::Body,
) -> (JoinHandle<Vec<serde_json::Value>>, oneshot::Sender<()>) {
fn collect_logs<B>(body: B) -> (JoinHandle<Vec<serde_json::Value>>, oneshot::Sender<()>)
where
B: Body<Data = Bytes> + Send + Unpin + 'static,
B::Error: std::error::Error,
{
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let (done_tx, done_rx) = oneshot::channel();
let result = tokio::spawn(async move {
let mut result = Vec::new();
let logs = &mut result;
let fut = async move {
while let Some(res) = body.data().await {
while let Some(res) = body.frame().await {
let chunk = match res {
Ok(chunk) => chunk,
Ok(frame) => {
if let Ok(data) = frame.into_data() {
data
} else {
break;
}
}
Err(e) => {
println!("body failed: {}", e);
break;
Expand Down

0 comments on commit 1b08122

Please sign in to comment.