Skip to content

Commit

Permalink
report errors
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Feb 8, 2024
1 parent 5a89abe commit 94fc2ac
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 39 deletions.
4 changes: 3 additions & 1 deletion proxy/src/bin/pg_sni_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
?unexpected,
"unexpected startup packet, rejecting connection"
);
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?
stream
.throw_error_str(ERR_INSECURE_CONNECTION, proxy::error::ErrorKind::User)
.await?
}
}
}
Expand Down
32 changes: 28 additions & 4 deletions proxy/src/cancellation.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,48 @@
use anyhow::Context;
use dashmap::DashMap;
use pq_proto::CancelKeyData;
use std::{net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;

use crate::error::ReportableError;

/// Enables serving `CancelRequest`s.
#[derive(Default)]
pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);

#[derive(Debug, Error)]
pub enum CancelError {
#[error("query cancellation key not found: {0}")]
KeyNotFound(CancelKeyData),
#[error("{0}")]
IO(#[from] std::io::Error),
#[error("{0}")]
Postgres(#[from] tokio_postgres::Error),
}

impl ReportableError for CancelError {
fn get_error_type(&self) -> crate::error::ErrorKind {
match self {
// not really user error, but :shrug:
// not really an error either... need to handle at some point to forward to other proxies
CancelError::KeyNotFound(_) => crate::error::ErrorKind::User,
CancelError::IO(_) => crate::error::ErrorKind::Compute,
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
}
}
}

impl CancelMap {
/// Cancel a running query for the corresponding connection.
pub async fn cancel_session(&self, key: CancelKeyData) -> anyhow::Result<()> {
pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> {
// NB: we should immediately release the lock after cloning the token.
let cancel_closure = self
.0
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("query cancellation key not found: {key}"))?;
.ok_or(CancelError::KeyNotFound(key))?;

info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query().await
Expand Down Expand Up @@ -81,7 +105,7 @@ impl CancelClosure {
}

/// Cancels the query running on user's compute node.
pub async fn try_cancel_query(self) -> anyhow::Result<()> {
async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;

Expand Down
14 changes: 12 additions & 2 deletions proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use tokio::sync::mpsc;
use uuid::Uuid;

use crate::{
console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer, BranchId,
EndpointId, ProjectId, RoleName,
console::messages::MetricsAuxInfo,
error::ErrorKind,
metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
BranchId, EndpointId, ProjectId, RoleName,
};

pub mod parquet;
Expand Down Expand Up @@ -109,6 +111,14 @@ impl RequestMonitoring {
}

pub fn set_error_kind(&mut self, error: ErrorKind) {
ERROR_BY_KIND
.with_label_values(&[error.to_metric_label()])
.inc();
if let Some(ep) = &self.endpoint_id {
ENDPOINT_ERRORS_BY_KIND
.with_label_values(&[error.to_metric_label()])
.measure(ep);
}
self.error_kind = Some(error);
}

Expand Down
13 changes: 12 additions & 1 deletion proxy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait UserFacingError: ReportableError {
}
}

#[derive(Clone)]
#[derive(Copy, Clone, Debug)]
pub enum ErrorKind {
/// Wrong password, unknown endpoint, protocol violation, etc...
User,
Expand Down Expand Up @@ -61,6 +61,17 @@ impl ErrorKind {
ErrorKind::Compute => "non-retryable compute error (or exhausted retry capacity)",
}
}

pub fn to_metric_label(&self) -> &'static str {
match self {
ErrorKind::User => "user",
ErrorKind::Disconnect => "disconnect",
ErrorKind::RateLimit => "ratelimit",
ErrorKind::Service => "service",
ErrorKind::ControlPlane => "controlplane",
ErrorKind::Compute => "compute",
}
}
}

pub trait ReportableError: fmt::Display + Send + 'static {
Expand Down
19 changes: 19 additions & 0 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,22 @@ pub static CONNECTING_ENDPOINTS: Lazy<HyperLogLogVec<32>> = Lazy::new(|| {
)
.unwrap()
});

pub static ERROR_BY_KIND: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_errors_total",
"Number of errors by a given classification",
&["type"],
)
.unwrap()
});

pub static ENDPOINT_ERRORS_BY_KIND: Lazy<HyperLogLogVec<32>> = Lazy::new(|| {
register_hll_vec!(
32,
"proxy_endpoints_affected_by_errors",
"Number of endpoints affected by errors of a given classification",
&["type"],
)
.unwrap()
});
46 changes: 38 additions & 8 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
compute,
config::{ProxyConfig, TlsConfig},
context::RequestMonitoring,
error::ReportableError,
metrics::{NUM_CLIENT_CONNECTION_GAUGE, NUM_CONNECTION_REQUESTS_GAUGE},
protocol2::WithClientIp,
proxy::handshake::{handshake, HandshakeData},
Expand All @@ -28,6 +29,7 @@ use pq_proto::{BeMessage as Be, StartupMessageParams};
use regex::Regex;
use smol_str::{format_smolstr, SmolStr};
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, Instrument};
Expand Down Expand Up @@ -120,9 +122,9 @@ pub async fn task_main(
match res {
Err(e) => {
// todo: log and push to ctx the error kind
// ctx.set_error_kind(e.get_error_type())
ctx.set_error_kind(e.get_error_type());
ctx.log();
Err(e)
Err(e.into())
}
Ok(None) => {
ctx.set_success();
Expand Down Expand Up @@ -190,14 +192,42 @@ impl ClientMode {
}
}

#[derive(Debug, Error)]
// almost all errors should be reported to the user, but there's a few cases where we cannot
// 1. Cancellation: we are not allowed to tell the client any cancellation statuses for security reasons
// 2. Handshake: handshake reports errors if it can, otherwise if the handshake fails due to protocol violation,
// we cannot be sure the client even understands our error message
// 3. PrepareClient: The client disconnected, so we can't tell them anyway...
pub enum ClientRequestError {
#[error("{0}")]
Cancellation(#[from] cancellation::CancelError),
#[error("{0}")]
Handshake(#[from] handshake::HandshakeError),
#[error("{0}")]
PrepareClient(#[from] std::io::Error),
#[error("{0}")]
ReportedError(#[from] crate::stream::ReportedError),
}

impl ReportableError for ClientRequestError {
fn get_error_type(&self) -> crate::error::ErrorKind {
match self {
ClientRequestError::Cancellation(e) => e.get_error_type(),
ClientRequestError::Handshake(e) => e.get_error_type(),
ClientRequestError::ReportedError(e) => e.get_error_type(),
ClientRequestError::PrepareClient(_) => crate::error::ErrorKind::Disconnect,
}
}
}

pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
cancel_map: Arc<CancelMap>,
stream: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<Option<ProxyPassthrough<S>>> {
) -> Result<Option<ProxyPassthrough<S>>, ClientRequestError> {
info!(
protocol = ctx.protocol,
"handling interactive connection from client"
Expand All @@ -217,10 +247,10 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let (mut stream, params) = match handshake(stream, mode.handshake_tls(tls)).await? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return cancel_map
return Ok(cancel_map
.cancel_session(cancel_key_data)
.await
.map(|()| None)
.map(|()| None)?)
}
};
drop(pause);
Expand All @@ -246,7 +276,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
if !endpoint_rate_limiter.check(ep) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await;
.await?;
}
}

Expand All @@ -266,7 +296,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let app = params.get("application_name");
let params_span = tracing::info_span!("", ?user, ?db, ?app);

return stream.throw_error(e).instrument(params_span).await;
return stream.throw_error(e).instrument(params_span).await?;
}
};

Expand Down Expand Up @@ -307,7 +337,7 @@ async fn prepare_client_connection(
node: &compute::PostgresConnection,
session: &cancellation::Session,
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> anyhow::Result<()> {
) -> Result<(), std::io::Error> {
// Register compute's query cancellation token and produce a new, unique one.
// The new token (cancel_key_data) will be sent to the client.
let cancel_key_data = session.enable_query_cancellation(node.cancel_closure.clone());
Expand Down
18 changes: 10 additions & 8 deletions proxy/src/proxy/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ pub enum HandshakeError {
#[error("protocol violation")]
ProtocolViolation,

#[error("connection is insecure (try using `sslmode=require`)")]
InsecureConnection,

#[error("missing certificate")]
MissingCertificate,

Expand All @@ -29,20 +26,26 @@ pub enum HandshakeError {

#[error("{0}")]
Io(#[from] std::io::Error),

#[error("{0}")]
ReportedError(#[from] crate::stream::ReportedError),
}

impl ReportableError for HandshakeError {
fn get_error_type(&self) -> crate::error::ErrorKind {
match self {
HandshakeError::EarlyData => crate::error::ErrorKind::User,
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
HandshakeError::InsecureConnection => crate::error::ErrorKind::User,
HandshakeError::MissingCertificate => todo!(),
// This error should not happen, but will if we have no default certificate and
// the client sends no SNI extension.
// If they provide SNI then we can be sure there is a certificate that matches.
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
StreamUpgradeError::Io(_) => crate::error::ErrorKind::Disconnect,
},
HandshakeError::Io(_) => crate::error::ErrorKind::Disconnect,
HandshakeError::ReportedError(e) => e.get_error_type(),
}
}
}
Expand Down Expand Up @@ -120,10 +123,9 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// Check that the config has been consumed during upgrade
// OR we didn't provide it at all (for dev purposes).
if tls.is_some() {
stream
.write_message(&Be::ErrorResponse(ERR_INSECURE_CONNECTION, None))
return stream
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User)
.await?;
return Err(HandshakeError::InsecureConnection);
}

info!(session_type = "normal", "successful handshake");
Expand Down
6 changes: 3 additions & 3 deletions proxy/src/serverless/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
cancellation::CancelMap,
config::ProxyConfig,
context::RequestMonitoring,
error::io_error,
error::{io_error, ReportableError},
proxy::{handle_client, ClientMode},
rate_limiter::EndpointRateLimiter,
};
Expand Down Expand Up @@ -151,9 +151,9 @@ pub async fn serve_websocket(
match res {
Err(e) => {
// todo: log and push to ctx the error kind
// ctx.set_error_kind(e.get_error_type())
ctx.set_error_kind(e.get_error_type());
ctx.log();
Err(e)
Err(e.into())
}
Ok(None) => {
ctx.set_success();
Expand Down
Loading

0 comments on commit 94fc2ac

Please sign in to comment.