From 4bcb5fe3a9f38bf795ba57bf8cb9566091e75015 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Fri, 14 Feb 2025 00:00:00 +0000 Subject: [PATCH] refactor(app/core): prepare rescue body for http-body upgrade this commit makes some minor alterations to our error recovery body middleware. see https://github.com/linkerd/linkerd2/issues/8733 for more information. this commit removes an `assert!` statement from the implementation of ` as Body>::poll_data()`. see the documentation of `Body::poll_frame()`: > Once the end of the stream is reached, implementations should > continue to return [`Poll::Ready(None)`]. https://github.com/hyperium/http-body/commit/1090bfff26884779e026f4cab37a2a3ee165a69d#diff-33aabe8c2aaa7614022addf244245e09bbff576a67a9ae3c6938c8a868201d36R60-R61 to do this, this commit introduces a distinct terminal state `Inner::Rescued` to represent when the underlying `B`-typed body has yielded an error and been rescued. once in this state the body will yield no more data frames, instead yielding a collection of trailers describing the mid-stream error that was encountered by the underlying body. the call to `R::rescue` is also moved down into the helper function fka `grpc_trailers()`. this helps the function follow the grain of our "state machine" a little more directly. see #3615, #3614, and #3611 for pretext to this change. Signed-off-by: katelyn martin --- linkerd/app/core/src/errors/body.rs | 98 ++++++++++++++++++----------- 1 file changed, 61 insertions(+), 37 deletions(-) diff --git a/linkerd/app/core/src/errors/body.rs b/linkerd/app/core/src/errors/body.rs index b0de0516c8..975979838f 100644 --- a/linkerd/app/core/src/errors/body.rs +++ b/linkerd/app/core/src/errors/body.rs @@ -2,7 +2,7 @@ use super::{ header::{GRPC_MESSAGE, GRPC_STATUS}, respond::{HttpRescue, SyntheticHttpResponse}, }; -use http::header::HeaderValue; +use http::{header::HeaderValue, HeaderMap}; use linkerd_error::{Error, Result}; use pin_project::pin_project; use std::{ @@ -20,14 +20,18 @@ pub struct ResponseBody(#[pin] Inner); #[pin_project(project = InnerProj)] enum Inner { + /// An inert body that delegates directly down to the underlying body `B`. Passthru(#[pin] B), + /// A body that will be rescued if it yields an error. GrpcRescue { #[pin] inner: B, - trailers: Option, + /// An error response [strategy][HttpRescue]. rescue: R, emit_headers: bool, }, + /// The underlying body `B` yielded an error and was "rescued". + Rescued { trailers: Option }, } // === impl ResponseBody === @@ -44,7 +48,6 @@ impl ResponseBody { inner, rescue, emit_headers, - trailers: None, }) } } @@ -64,34 +67,27 @@ where type Error = B::Error; fn poll_data( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let ResponseBodyProj(inner) = self.project(); + let ResponseBodyProj(inner) = self.as_mut().project(); match inner.project() { InnerProj::Passthru(inner) => inner.poll_data(cx), + InnerProj::Rescued { trailers: _ } => Poll::Ready(None), InnerProj::GrpcRescue { inner, - trailers, rescue, emit_headers, - } => { - // should not be calling poll_data if we have set trailers derived from an error - assert!(trailers.is_none()); - match inner.poll_data(cx) { - Poll::Ready(Some(Err(error))) => { - let SyntheticHttpResponse { - grpc_status, - message, - .. - } = rescue.rescue(error)?; - let t = Self::grpc_trailers(grpc_status, &message, *emit_headers); - *trailers = Some(t); - Poll::Ready(None) - } - data => data, + } => match inner.poll_data(cx) { + Poll::Ready(Some(Err(error))) => { + // The inner body has yielded an error, which we will try to rescue. If so, + // store our synthetic trailers reporting the error. + let trailers = Self::rescue(error, rescue, *emit_headers)?; + self.set_rescued(trailers); + Poll::Ready(None) } - } + data => data, + }, } } @@ -103,12 +99,8 @@ where let ResponseBodyProj(inner) = self.project(); match inner.project() { InnerProj::Passthru(inner) => inner.poll_trailers(cx), - InnerProj::GrpcRescue { - inner, trailers, .. - } => match trailers.take() { - Some(t) => Poll::Ready(Ok(Some(t))), - None => inner.poll_trailers(cx), - }, + InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx), + InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())), } } @@ -117,9 +109,8 @@ where let Self(inner) = self; match inner { Inner::Passthru(inner) => inner.is_end_stream(), - Inner::GrpcRescue { - inner, trailers, .. - } => trailers.is_none() && inner.is_end_stream(), + Inner::GrpcRescue { inner, .. } => inner.is_end_stream(), + Inner::Rescued { trailers } => trailers.is_none(), } } @@ -129,25 +120,58 @@ where match inner { Inner::Passthru(inner) => inner.size_hint(), Inner::GrpcRescue { inner, .. } => inner.size_hint(), + Inner::Rescued { .. } => http_body::SizeHint::with_exact(0), } } } -impl ResponseBody { - fn grpc_trailers(code: tonic::Code, message: &str, emit_headers: bool) -> http::HeaderMap { - debug!(grpc.status = ?code, "Synthesizing gRPC trailers"); +impl ResponseBody +where + B: http_body::Body, + R: HttpRescue, +{ + /// Maps an error yielded by the inner body to a collection of gRPC trailers. + /// + /// This function returns `Ok(trailers)` if the given [`HttpRescue`] strategy could identify + /// a cause for an error yielded by the inner `B`-typed body. + fn rescue( + error: B::Error, + rescue: &R, + emit_headers: bool, + ) -> Result { + let SyntheticHttpResponse { + grpc_status, + message, + .. + } = rescue.rescue(error)?; + + debug!(grpc.status = ?grpc_status, "Synthesizing gRPC trailers"); let mut t = http::HeaderMap::new(); - t.insert(GRPC_STATUS, super::code_header(code)); + t.insert(GRPC_STATUS, super::code_header(grpc_status)); if emit_headers { + // A gRPC message trailer is only included if instructed to emit additional headers. t.insert( GRPC_MESSAGE, - HeaderValue::from_str(message).unwrap_or_else(|error| { + HeaderValue::from_str(&message).unwrap_or_else(|error| { warn!(%error, "Failed to encode error header"); HeaderValue::from_static("Unexpected error") }), ); } - t + + Ok(t) + } +} + +impl ResponseBody { + /// Marks this body as "rescued". + /// + /// No more data frames will be yielded, and the given trailers will be returned when this + /// body is polled. + fn set_rescued(mut self: Pin<&mut Self>, trailers: HeaderMap) { + let trailers = Some(trailers); + let new = Self(Inner::Rescued { trailers }); + self.set(new); } }