Skip to content

Commit

Permalink
chore(app/inbound): address hyper deprecations in http/2 tests (#3445)
Browse files Browse the repository at this point in the history
this addresses deprecations in inbound proxy tests that should migrate
to hyper's new http/2 client connection builder.

http/1 tests will be upgraded in a follow-on commit.

the `connect_and_accept(..)` helper function is copied, and duplicated
into an http/2 version.

see <linkerd/linkerd2#8733> for more
information on upgrading to hyper 1.0.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn authored Dec 11, 2024
1 parent 307dbc4 commit 24dc5d8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 34 deletions.
44 changes: 17 additions & 27 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn http1_bad_gateway_meshed_response_error_header() {
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;

// Send a request and assert that it is a BAD_GATEWAY with the expected
// header message.
Expand All @@ -209,7 +209,7 @@ async fn http1_bad_gateway_meshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down Expand Up @@ -380,17 +380,15 @@ async fn h2_response_meshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is SERVICE_UNAVAILABLE with the
// expected header message.
Expand All @@ -400,7 +398,7 @@ async fn h2_response_meshed_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -422,17 +420,15 @@ async fn h2_response_unmeshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is SERVICE_UNAVAILABLE with the
// expected header message.
Expand All @@ -442,7 +438,7 @@ async fn h2_response_unmeshed_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -466,17 +462,15 @@ async fn grpc_meshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -487,7 +481,7 @@ async fn grpc_meshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -509,17 +503,15 @@ async fn grpc_unmeshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -530,7 +522,7 @@ async fn grpc_unmeshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down Expand Up @@ -560,9 +552,7 @@ async fn grpc_response_class() {
};

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
Expand All @@ -575,7 +565,7 @@ async fn grpc_response_class() {
.http_endpoint
.into_report(time::Duration::from_secs(3600));
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -587,7 +577,7 @@ async fn grpc_response_class() {
.unwrap();

let mut rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down
64 changes: 57 additions & 7 deletions linkerd/app/test/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ use crate::{
app_core::{svc, Error},
io, ContextError,
};
use hyper::{body::HttpBody, Body};
use http_body::Body;
use tokio::task::JoinSet;
use tower::ServiceExt;
use tracing::Instrument;

#[allow(deprecated)] // linkerd/linkerd2#8733
use hyper::client::conn::{Builder as ClientBuilder, SendRequest};

type BoxServer = svc::BoxTcp<io::DuplexStream>;

/// Connects a client and server, running a proxy between them.
Expand All @@ -18,9 +15,62 @@ type BoxServer = svc::BoxTcp<io::DuplexStream>;
/// await a response, and (2) a [`JoinSet<T>`] running background tasks.
#[allow(deprecated)] // linkerd/linkerd2#8733
pub async fn connect_and_accept(
client_settings: &mut ClientBuilder,
client_settings: &mut hyper::client::conn::Builder,
server: BoxServer,
) -> (SendRequest<Body>, JoinSet<Result<(), Error>>) {
) -> (
hyper::client::conn::SendRequest<hyper::Body>,
JoinSet<Result<(), Error>>,
) {
tracing::info!(settings = ?client_settings, "connecting client with");
let (client_io, server_io) = io::duplex(4096);

let (client, conn) = client_settings
.handshake(client_io)
.await
.expect("Client must connect");

let mut bg = tokio::task::JoinSet::new();
bg.spawn(
async move {
server
.oneshot(server_io)
.await
.map_err(ContextError::ctx("proxy background task failed"))?;
tracing::info!("proxy serve task complete");
Ok(())
}
.instrument(tracing::info_span!("proxy")),
);
bg.spawn(
async move {
conn.await
.map_err(ContextError::ctx("client background task failed"))
.map_err(Error::from)?;
tracing::info!("client background complete");
Ok(())
}
.instrument(tracing::info_span!("client_bg")),
);

(client, bg)
}

/// Connects a client and server, running a proxy between them.
///
/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and
/// await a response, and (2) a [`JoinSet<T>`] running background tasks.
pub async fn connect_and_accept_http2<B>(
client_settings: &mut hyper::client::conn::http2::Builder,
server: BoxServer,
) -> (
hyper::client::conn::http2::SendRequest<B>,
JoinSet<Result<(), Error>>,
)
where
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
tracing::info!(settings = ?client_settings, "connecting client with");
let (client_io, server_io) = io::duplex(4096);

Expand Down Expand Up @@ -58,7 +108,7 @@ pub async fn connect_and_accept(
/// Collects a request or response body, returning it as a [`String`].
pub async fn body_to_string<T>(body: T) -> Result<String, Error>
where
T: HttpBody,
T: Body,
T::Error: Into<Error>,
{
let bytes = body
Expand Down

0 comments on commit 24dc5d8

Please sign in to comment.