Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set grpc-status headers on dispatch errors #416

Merged
merged 4 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ dependencies = [
"linkerd2-drain",
"linkerd2-duplex",
"linkerd2-error",
"linkerd2-error-respond",
"linkerd2-exp-backoff",
"linkerd2-fallback",
"linkerd2-http-classify",
Expand Down Expand Up @@ -786,6 +787,15 @@ dependencies = [
"futures",
]

[[package]]
name = "linkerd2-error-respond"
version = "0.1.0"
dependencies = [
"futures",
"linkerd2-error",
"tower",
]

[[package]]
name = "linkerd2-exp-backoff"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ linkerd2-dns = { path = "../../dns" }
linkerd2-drain = { path = "../../drain" }
linkerd2-duplex = { path = "../../duplex" }
linkerd2-error = { path = "../../error" }
linkerd2-error-respond = { path = "../../error-respond" }
linkerd2-exp-backoff = { path = "../../exp-backoff" }
linkerd2-fallback = { path = "../../fallback" }
linkerd2-http-classify = { path = "../../http-classify" }
Expand Down
265 changes: 151 additions & 114 deletions linkerd/app/core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,150 +1,187 @@
//! Layer to map HTTP service errors into appropriate `http::Response`s.

use crate::svc;
use futures::{Future, Poll};
use http::{header, Request, Response, StatusCode, Version};
use crate::proxy::{buffer, identity};
use http::{header::HeaderValue, StatusCode};
use linkerd2_error::Error;
use linkerd2_error_respond as respond;
use linkerd2_proxy_http::HasH2Reason;
use tracing::{debug, error, warn};

/// Layer to map HTTP service errors into appropriate `http::Response`s.
pub fn layer() -> Layer {
Layer
}

#[derive(Clone, Debug)]
pub struct Layer;
use linkerd2_router::error as router;
use tower::load_shed::error as shed;
use tower_grpc::{self as grpc, Code};
use tracing::{debug, warn};

#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
pub fn layer<B: Default>() -> respond::RespondLayer<NewRespond<B>> {
respond::RespondLayer::new(NewRespond(std::marker::PhantomData))
}

#[derive(Clone, Debug)]
pub struct Service<S>(S);

#[derive(Debug)]
pub struct ResponseFuture<F> {
inner: F,
is_http2: bool,
}
pub struct NewRespond<B>(std::marker::PhantomData<fn() -> B>);

#[derive(Clone, Debug)]
pub struct StatusError {
pub status: http::StatusCode,
pub message: String,
#[derive(Copy, Clone, Debug)]
pub enum Respond<B> {
Http1(std::marker::PhantomData<fn() -> B>),
Http2 { is_grpc: bool },
}

impl<M> svc::Layer<M> for Layer {
type Service = Stack<M>;

fn layer(&self, inner: M) -> Self::Service {
Stack { inner }
impl<A, B: Default> respond::NewRespond<http::Request<A>> for NewRespond<B> {
type Response = http::Response<B>;
type Respond = Respond<B>;

fn new_respond(&self, req: &http::Request<A>) -> Self::Respond {
if req.version() == http::Version::HTTP_2 {
let is_grpc = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok().map(|s| s.starts_with("application/grpc")))
.unwrap_or(false);
Respond::Http2 { is_grpc }
} else {
Respond::Http1(self.0)
}
}
}

impl<T, M> svc::Service<T> for Stack<M>
where
M: svc::Service<T>,
{
type Response = Service<M::Response>;
type Error = M::Error;
type Future = futures::future::Map<M::Future, fn(M::Response) -> Self::Response>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, target: T) -> Self::Future {
self.inner.call(target).map(Service)
impl<B> Clone for NewRespond<B> {
fn clone(&self) -> Self {
NewRespond(self.0)
}
}

impl<S, B1, B2> svc::Service<Request<B1>> for Service<S>
where
S: svc::Service<Request<B1>, Response = Response<B2>>,
S::Error: Into<Error>,
B2: Default,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready().map_err(Into::into)
}
impl<B: Default> respond::Respond for Respond<B> {
type Response = http::Response<B>;

fn call(&mut self, req: Request<B1>) -> Self::Future {
let is_http2 = req.version() == Version::HTTP_2;
let inner = self.0.call(req);
ResponseFuture { inner, is_http2 }
}
}
fn respond(&self, error: Error) -> Result<Self::Response, Error> {
warn!("Failed to proxy request: {}", error);

if let Respond::Http2 { is_grpc } = self {
if let Some(reset) = error.h2_reason() {
debug!(%reset, "Propagating HTTP2 reset");
return Err(error);
}

impl<F, B> Future for ResponseFuture<F>
where
F: Future<Item = Response<B>>,
F::Error: Into<Error>,
B: Default,
{
type Item = Response<B>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(ok) => Ok(ok),
Err(err) => {
let err = err.into();

if self.is_http2 {
if err.h2_reason().is_some() {
debug!("propagating http2 response error: {:?}", err);
return Err(err);
}
}

let response = Response::builder()
.status(map_err_to_5xx(err))
.header(header::CONTENT_LENGTH, "0")
if *is_grpc {
let mut rsp = http::Response::builder()
.header(http::header::CONTENT_LENGTH, "0")
.body(B::default())
.expect("app::errors response is valid");

Ok(response.into())
let code = set_grpc_status(error, rsp.headers_mut());
debug!(?code, "Handling error with gRPC status");
return Ok(rsp);
}
}

let status = http_status(error);
debug!(%status, "Handling error with HTTP response");
Ok(http::Response::builder()
.status(status)
.header(http::header::CONTENT_LENGTH, "0")
.body(B::default())
.expect("error response must be valid"))
}
}

fn map_err_to_5xx(e: Error) -> StatusCode {
use crate::proxy::buffer;
use linkerd2_router::error as router;
use tower::load_shed::error as shed;

if let Some(ref c) = e.downcast_ref::<router::NoCapacity>() {
warn!("router at capacity ({})", c.0);
fn http_status(error: Error) -> StatusCode {
if error.is::<router::NoCapacity>() {
http::StatusCode::SERVICE_UNAVAILABLE
} else if let Some(_) = e.downcast_ref::<shed::Overloaded>() {
warn!("server overloaded, max-in-flight reached");
} else if error.is::<shed::Overloaded>() {
http::StatusCode::SERVICE_UNAVAILABLE
} else if let Some(_) = e.downcast_ref::<buffer::Aborted>() {
warn!("request aborted because it reached the configured dispatch deadline");
} else if error.is::<buffer::Aborted>() {
http::StatusCode::SERVICE_UNAVAILABLE
} else if let Some(_) = e.downcast_ref::<router::NotRecognized>() {
error!("could not recognize request");
http::StatusCode::BAD_GATEWAY
} else if let Some(err) = e.downcast_ref::<StatusError>() {
error!(%err.status, %err.message);
err.status
} else if error.is::<IdentityRequired>() {
http::StatusCode::FORBIDDEN
} else {
// we probably should have handled this before?
error!("unexpected error: {}", e);
http::StatusCode::BAD_GATEWAY
}
}

impl std::fmt::Display for StatusError {
fn set_grpc_status(error: Error, headers: &mut http::HeaderMap) -> Code {
const GRPC_STATUS: &'static str = "grpc-status";
const GRPC_MESSAGE: &'static str = "grpc-message";

if error.is::<router::NoCapacity>() {
let code = Code::Unavailable;
headers.insert(GRPC_STATUS, code_header(code));
headers.insert(
GRPC_MESSAGE,
HeaderValue::from_static("proxy router cache exhausted"),
);
code
} else if error.is::<shed::Overloaded>() {
let code = Code::Unavailable;
headers.insert(GRPC_STATUS, code_header(code));
headers.insert(
GRPC_MESSAGE,
HeaderValue::from_static("proxy max-concurrency exhausted"),
);
code
} else if error.is::<buffer::Aborted>() {
let code = Code::Unavailable;
headers.insert(GRPC_STATUS, code_header(code));
headers.insert(
GRPC_MESSAGE,
HeaderValue::from_static("proxy dispatch timed out"),
);
code
} else if error.is::<IdentityRequired>() {
let code = Code::FailedPrecondition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything relied on the Forbidden status code, but is this changing the error type for required tap identity? Before in require_identity_on_endpoint, if the condition failed we returned an error::StatusError with a Status = Forbidden. Now it's a gRPC status code FailedPrecondition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still a FORBIDDEN for HTTP requests; but is now a FailedPrecondition for gRPC requests.. reading through the descriptions i thought this was the closest ft.

headers.insert(GRPC_STATUS, code_header(code));
if let Ok(msg) = HeaderValue::from_str(&error.to_string()) {
headers.insert(GRPC_MESSAGE, msg);
}
code
} else {
let code = Code::Internal;
headers.insert(GRPC_STATUS, code_header(code));
if let Ok(msg) = HeaderValue::from_str(&error.to_string()) {
headers.insert(GRPC_MESSAGE, msg);
}
code
}
}

// Copied from tonic, where it's private.
fn code_header(code: grpc::Code) -> HeaderValue {
match code {
Code::Ok => HeaderValue::from_static("0"),
Code::Cancelled => HeaderValue::from_static("1"),
Code::Unknown => HeaderValue::from_static("2"),
Code::InvalidArgument => HeaderValue::from_static("3"),
Code::DeadlineExceeded => HeaderValue::from_static("4"),
Code::NotFound => HeaderValue::from_static("5"),
Code::AlreadyExists => HeaderValue::from_static("6"),
Code::PermissionDenied => HeaderValue::from_static("7"),
Code::ResourceExhausted => HeaderValue::from_static("8"),
Code::FailedPrecondition => HeaderValue::from_static("9"),
Code::Aborted => HeaderValue::from_static("10"),
Code::OutOfRange => HeaderValue::from_static("11"),
Code::Unimplemented => HeaderValue::from_static("12"),
Code::Internal => HeaderValue::from_static("13"),
Code::Unavailable => HeaderValue::from_static("14"),
Code::DataLoss => HeaderValue::from_static("15"),
Code::Unauthenticated => HeaderValue::from_static("16"),
Code::__NonExhaustive => unreachable!("Code::__NonExhaustive"),
}
}

#[derive(Debug)]
pub struct IdentityRequired {
pub required: identity::Name,
pub found: Option<identity::Name>,
}

impl std::fmt::Display for IdentityRequired {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.message.fmt(f)
match self.found {
Some(ref found) => write!(
f,
"request required the identity '{}' but '{}' found",
self.required, found
),
None => write!(
f,
"request required the identity '{}' but no identity found",
self.required
),
}
}
}

impl std::error::Error for StatusError {}
impl std::error::Error for IdentityRequired {}
4 changes: 4 additions & 0 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl<S> Stack<S> {
self.push_pending().push(buffer::layer(bound, d))
}

pub fn push_per_make<L: Clone>(self, layer: L) -> Stack<stack::per_make::PerMake<L, S>> {
self.push(stack::per_make::layer(layer))
}

pub fn push_spawn_ready(self) -> Stack<tower_spawn_ready::MakeSpawnReady<S>> {
self.push(SpawnReadyLayer::new())
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(insert::layer(move || {
DispatchDeadline::after(buffer.dispatch_timeout)
}))
.push(errors::layer())
.push_per_make(errors::layer())
.push(trace::layer(|src: &tls::accept::Meta| {
info_span!(
"source",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl<A: OrigDstAddr> Config<A> {
DispatchDeadline::after(buffer.dispatch_timeout)
}))
.push(http::insert::target::layer())
.push(errors::layer())
.push_per_make(errors::layer())
.push(trace::layer(
|src: &tls::accept::Meta| info_span!("source", target.addr = %src.addrs.target_addr()),
))
Expand Down
Loading