From 24dc5d8a1b5853daf653be7fe3368e3027da52a0 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 11 Dec 2024 11:37:25 -0500 Subject: [PATCH] chore(app/inbound): address hyper deprecations in http/2 tests (#3445) this addresses deprecations in inbound proxy tests that should migrate to hyper's new http/2 client connection builder. http/1 tests will be upgraded in a follow-on commit. the `connect_and_accept(..)` helper function is copied, and duplicated into an http/2 version. see for more information on upgrading to hyper 1.0. Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 44 +++++++----------- linkerd/app/test/src/http_util.rs | 64 ++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index aea44630b0..524971713d 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -199,7 +199,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -209,7 +209,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -380,9 +380,7 @@ async fn h2_response_meshed_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -390,7 +388,7 @@ async fn h2_response_meshed_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await; // Send a request and assert that it is SERVICE_UNAVAILABLE with the // expected header message. @@ -400,7 +398,7 @@ async fn h2_response_meshed_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -422,9 +420,7 @@ async fn h2_response_unmeshed_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -432,7 +428,7 @@ async fn h2_response_unmeshed_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await; // Send a request and assert that it is SERVICE_UNAVAILABLE with the // expected header message. @@ -442,7 +438,7 @@ async fn h2_response_unmeshed_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -466,9 +462,7 @@ async fn grpc_meshed_response_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -476,7 +470,7 @@ async fn grpc_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -487,7 +481,7 @@ async fn grpc_meshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -509,9 +503,7 @@ async fn grpc_unmeshed_response_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -519,7 +511,7 @@ async fn grpc_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -530,7 +522,7 @@ async fn grpc_unmeshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -560,9 +552,7 @@ async fn grpc_response_class() { }; // Build a client using the connect that always errors. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -575,7 +565,7 @@ async fn grpc_response_class() { .http_endpoint .into_report(time::Duration::from_secs(3600)); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -587,7 +577,7 @@ async fn grpc_response_class() { .unwrap(); let mut rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 91bb4f53d5..f675263b27 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -2,14 +2,11 @@ use crate::{ app_core::{svc, Error}, io, ContextError, }; -use hyper::{body::HttpBody, Body}; +use http_body::Body; use tokio::task::JoinSet; use tower::ServiceExt; use tracing::Instrument; -#[allow(deprecated)] // linkerd/linkerd2#8733 -use hyper::client::conn::{Builder as ClientBuilder, SendRequest}; - type BoxServer = svc::BoxTcp; /// Connects a client and server, running a proxy between them. @@ -18,9 +15,62 @@ type BoxServer = svc::BoxTcp; /// await a response, and (2) a [`JoinSet`] running background tasks. #[allow(deprecated)] // linkerd/linkerd2#8733 pub async fn connect_and_accept( - client_settings: &mut ClientBuilder, + client_settings: &mut hyper::client::conn::Builder, server: BoxServer, -) -> (SendRequest, JoinSet>) { +) -> ( + hyper::client::conn::SendRequest, + JoinSet>, +) { + tracing::info!(settings = ?client_settings, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client_settings + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server + .oneshot(server_io) + .await + .map_err(ContextError::ctx("proxy background task failed"))?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from)?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) +} + +/// Connects a client and server, running a proxy between them. +/// +/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and +/// await a response, and (2) a [`JoinSet`] running background tasks. +pub async fn connect_and_accept_http2( + client_settings: &mut hyper::client::conn::http2::Builder, + server: BoxServer, +) -> ( + hyper::client::conn::http2::SendRequest, + JoinSet>, +) +where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ tracing::info!(settings = ?client_settings, "connecting client with"); let (client_io, server_io) = io::duplex(4096); @@ -58,7 +108,7 @@ pub async fn connect_and_accept( /// Collects a request or response body, returning it as a [`String`]. pub async fn body_to_string(body: T) -> Result where - T: HttpBody, + T: Body, T::Error: Into, { let bytes = body