-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set grpc-status headers on dispatch errors #416
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,150 +1,187 @@ | ||
//! Layer to map HTTP service errors into appropriate `http::Response`s. | ||
|
||
use crate::svc; | ||
use futures::{Future, Poll}; | ||
use http::{header, Request, Response, StatusCode, Version}; | ||
use crate::proxy::{buffer, identity}; | ||
use http::{header::HeaderValue, StatusCode}; | ||
use linkerd2_error::Error; | ||
use linkerd2_error_respond as respond; | ||
use linkerd2_proxy_http::HasH2Reason; | ||
use tracing::{debug, error, warn}; | ||
|
||
/// Layer to map HTTP service errors into appropriate `http::Response`s. | ||
pub fn layer() -> Layer { | ||
Layer | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct Layer; | ||
use linkerd2_router::error as router; | ||
use tower::load_shed::error as shed; | ||
use tower_grpc::{self as grpc, Code}; | ||
use tracing::{debug, warn}; | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct Stack<M> { | ||
inner: M, | ||
pub fn layer<B: Default>() -> respond::RespondLayer<NewRespond<B>> { | ||
respond::RespondLayer::new(NewRespond(std::marker::PhantomData)) | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct Service<S>(S); | ||
|
||
#[derive(Debug)] | ||
pub struct ResponseFuture<F> { | ||
inner: F, | ||
is_http2: bool, | ||
} | ||
pub struct NewRespond<B>(std::marker::PhantomData<fn() -> B>); | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct StatusError { | ||
pub status: http::StatusCode, | ||
pub message: String, | ||
#[derive(Copy, Clone, Debug)] | ||
pub enum Respond<B> { | ||
Http1(std::marker::PhantomData<fn() -> B>), | ||
Http2 { is_grpc: bool }, | ||
} | ||
|
||
impl<M> svc::Layer<M> for Layer { | ||
type Service = Stack<M>; | ||
|
||
fn layer(&self, inner: M) -> Self::Service { | ||
Stack { inner } | ||
impl<A, B: Default> respond::NewRespond<http::Request<A>> for NewRespond<B> { | ||
type Response = http::Response<B>; | ||
type Respond = Respond<B>; | ||
|
||
fn new_respond(&self, req: &http::Request<A>) -> Self::Respond { | ||
if req.version() == http::Version::HTTP_2 { | ||
let is_grpc = req | ||
.headers() | ||
.get(http::header::CONTENT_TYPE) | ||
.and_then(|v| v.to_str().ok().map(|s| s.starts_with("application/grpc"))) | ||
.unwrap_or(false); | ||
Respond::Http2 { is_grpc } | ||
} else { | ||
Respond::Http1(self.0) | ||
} | ||
} | ||
} | ||
|
||
impl<T, M> svc::Service<T> for Stack<M> | ||
where | ||
M: svc::Service<T>, | ||
{ | ||
type Response = Service<M::Response>; | ||
type Error = M::Error; | ||
type Future = futures::future::Map<M::Future, fn(M::Response) -> Self::Response>; | ||
|
||
fn poll_ready(&mut self) -> Poll<(), Self::Error> { | ||
self.inner.poll_ready() | ||
} | ||
fn call(&mut self, target: T) -> Self::Future { | ||
self.inner.call(target).map(Service) | ||
impl<B> Clone for NewRespond<B> { | ||
fn clone(&self) -> Self { | ||
NewRespond(self.0) | ||
} | ||
} | ||
|
||
impl<S, B1, B2> svc::Service<Request<B1>> for Service<S> | ||
where | ||
S: svc::Service<Request<B1>, Response = Response<B2>>, | ||
S::Error: Into<Error>, | ||
B2: Default, | ||
{ | ||
type Response = S::Response; | ||
type Error = Error; | ||
type Future = ResponseFuture<S::Future>; | ||
|
||
fn poll_ready(&mut self) -> Poll<(), Self::Error> { | ||
self.0.poll_ready().map_err(Into::into) | ||
} | ||
impl<B: Default> respond::Respond for Respond<B> { | ||
type Response = http::Response<B>; | ||
|
||
fn call(&mut self, req: Request<B1>) -> Self::Future { | ||
let is_http2 = req.version() == Version::HTTP_2; | ||
let inner = self.0.call(req); | ||
ResponseFuture { inner, is_http2 } | ||
} | ||
} | ||
fn respond(&self, error: Error) -> Result<Self::Response, Error> { | ||
warn!("Failed to proxy request: {}", error); | ||
|
||
if let Respond::Http2 { is_grpc } = self { | ||
if let Some(reset) = error.h2_reason() { | ||
debug!(%reset, "Propagating HTTP2 reset"); | ||
return Err(error); | ||
} | ||
|
||
impl<F, B> Future for ResponseFuture<F> | ||
where | ||
F: Future<Item = Response<B>>, | ||
F::Error: Into<Error>, | ||
B: Default, | ||
{ | ||
type Item = Response<B>; | ||
type Error = Error; | ||
|
||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||
match self.inner.poll() { | ||
Ok(ok) => Ok(ok), | ||
Err(err) => { | ||
let err = err.into(); | ||
|
||
if self.is_http2 { | ||
if err.h2_reason().is_some() { | ||
debug!("propagating http2 response error: {:?}", err); | ||
return Err(err); | ||
} | ||
} | ||
|
||
let response = Response::builder() | ||
.status(map_err_to_5xx(err)) | ||
.header(header::CONTENT_LENGTH, "0") | ||
if *is_grpc { | ||
let mut rsp = http::Response::builder() | ||
.header(http::header::CONTENT_LENGTH, "0") | ||
.body(B::default()) | ||
.expect("app::errors response is valid"); | ||
|
||
Ok(response.into()) | ||
let code = set_grpc_status(error, rsp.headers_mut()); | ||
debug!(?code, "Handling error with gRPC status"); | ||
return Ok(rsp); | ||
} | ||
} | ||
|
||
let status = http_status(error); | ||
debug!(%status, "Handling error with HTTP response"); | ||
Ok(http::Response::builder() | ||
.status(status) | ||
.header(http::header::CONTENT_LENGTH, "0") | ||
.body(B::default()) | ||
.expect("error response must be valid")) | ||
} | ||
} | ||
|
||
fn map_err_to_5xx(e: Error) -> StatusCode { | ||
use crate::proxy::buffer; | ||
use linkerd2_router::error as router; | ||
use tower::load_shed::error as shed; | ||
|
||
if let Some(ref c) = e.downcast_ref::<router::NoCapacity>() { | ||
warn!("router at capacity ({})", c.0); | ||
fn http_status(error: Error) -> StatusCode { | ||
if error.is::<router::NoCapacity>() { | ||
http::StatusCode::SERVICE_UNAVAILABLE | ||
} else if let Some(_) = e.downcast_ref::<shed::Overloaded>() { | ||
warn!("server overloaded, max-in-flight reached"); | ||
} else if error.is::<shed::Overloaded>() { | ||
http::StatusCode::SERVICE_UNAVAILABLE | ||
} else if let Some(_) = e.downcast_ref::<buffer::Aborted>() { | ||
warn!("request aborted because it reached the configured dispatch deadline"); | ||
} else if error.is::<buffer::Aborted>() { | ||
http::StatusCode::SERVICE_UNAVAILABLE | ||
} else if let Some(_) = e.downcast_ref::<router::NotRecognized>() { | ||
error!("could not recognize request"); | ||
http::StatusCode::BAD_GATEWAY | ||
} else if let Some(err) = e.downcast_ref::<StatusError>() { | ||
error!(%err.status, %err.message); | ||
err.status | ||
} else if error.is::<IdentityRequired>() { | ||
http::StatusCode::FORBIDDEN | ||
} else { | ||
// we probably should have handled this before? | ||
error!("unexpected error: {}", e); | ||
http::StatusCode::BAD_GATEWAY | ||
} | ||
} | ||
|
||
impl std::fmt::Display for StatusError { | ||
fn set_grpc_status(error: Error, headers: &mut http::HeaderMap) -> Code { | ||
const GRPC_STATUS: &'static str = "grpc-status"; | ||
const GRPC_MESSAGE: &'static str = "grpc-message"; | ||
|
||
if error.is::<router::NoCapacity>() { | ||
let code = Code::Unavailable; | ||
headers.insert(GRPC_STATUS, code_header(code)); | ||
headers.insert( | ||
GRPC_MESSAGE, | ||
HeaderValue::from_static("proxy router cache exhausted"), | ||
); | ||
code | ||
} else if error.is::<shed::Overloaded>() { | ||
let code = Code::Unavailable; | ||
headers.insert(GRPC_STATUS, code_header(code)); | ||
headers.insert( | ||
GRPC_MESSAGE, | ||
HeaderValue::from_static("proxy max-concurrency exhausted"), | ||
); | ||
code | ||
} else if error.is::<buffer::Aborted>() { | ||
let code = Code::Unavailable; | ||
headers.insert(GRPC_STATUS, code_header(code)); | ||
headers.insert( | ||
GRPC_MESSAGE, | ||
HeaderValue::from_static("proxy dispatch timed out"), | ||
); | ||
code | ||
} else if error.is::<IdentityRequired>() { | ||
let code = Code::FailedPrecondition; | ||
headers.insert(GRPC_STATUS, code_header(code)); | ||
if let Ok(msg) = HeaderValue::from_str(&error.to_string()) { | ||
headers.insert(GRPC_MESSAGE, msg); | ||
} | ||
code | ||
} else { | ||
let code = Code::Internal; | ||
headers.insert(GRPC_STATUS, code_header(code)); | ||
if let Ok(msg) = HeaderValue::from_str(&error.to_string()) { | ||
headers.insert(GRPC_MESSAGE, msg); | ||
} | ||
code | ||
} | ||
} | ||
|
||
// Copied from tonic, where it's private. | ||
fn code_header(code: grpc::Code) -> HeaderValue { | ||
match code { | ||
Code::Ok => HeaderValue::from_static("0"), | ||
Code::Cancelled => HeaderValue::from_static("1"), | ||
Code::Unknown => HeaderValue::from_static("2"), | ||
Code::InvalidArgument => HeaderValue::from_static("3"), | ||
Code::DeadlineExceeded => HeaderValue::from_static("4"), | ||
Code::NotFound => HeaderValue::from_static("5"), | ||
Code::AlreadyExists => HeaderValue::from_static("6"), | ||
Code::PermissionDenied => HeaderValue::from_static("7"), | ||
Code::ResourceExhausted => HeaderValue::from_static("8"), | ||
Code::FailedPrecondition => HeaderValue::from_static("9"), | ||
Code::Aborted => HeaderValue::from_static("10"), | ||
Code::OutOfRange => HeaderValue::from_static("11"), | ||
Code::Unimplemented => HeaderValue::from_static("12"), | ||
Code::Internal => HeaderValue::from_static("13"), | ||
Code::Unavailable => HeaderValue::from_static("14"), | ||
Code::DataLoss => HeaderValue::from_static("15"), | ||
Code::Unauthenticated => HeaderValue::from_static("16"), | ||
Code::__NonExhaustive => unreachable!("Code::__NonExhaustive"), | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct IdentityRequired { | ||
pub required: identity::Name, | ||
pub found: Option<identity::Name>, | ||
} | ||
|
||
impl std::fmt::Display for IdentityRequired { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
self.message.fmt(f) | ||
match self.found { | ||
Some(ref found) => write!( | ||
f, | ||
"request required the identity '{}' but '{}' found", | ||
self.required, found | ||
), | ||
None => write!( | ||
f, | ||
"request required the identity '{}' but no identity found", | ||
self.required | ||
), | ||
} | ||
} | ||
} | ||
|
||
impl std::error::Error for StatusError {} | ||
impl std::error::Error for IdentityRequired {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think anything relied on the
Forbidden
status code, but is this changing the error type for required tap identity? Before inrequire_identity_on_endpoint
, if the condition failed we returned anerror::StatusError
with aStatus = Forbidden
. Now it's a gRPC status codeFailedPrecondition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still a
FORBIDDEN
for HTTP requests; but is now aFailedPrecondition
for gRPC requests.. reading through the descriptions i thought this was the closest ft.