Skip to content

Commit

Permalink
conn pool strong typed errors
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Feb 8, 2024
1 parent 4bc1086 commit d7be26f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
14 changes: 4 additions & 10 deletions proxy/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);

#[derive(Debug, Error)]
pub enum CancelError {
#[error("query cancellation key not found: {0}")]
KeyNotFound(CancelKeyData),
#[error("{0}")]
IO(#[from] std::io::Error),
#[error("{0}")]
Expand All @@ -25,9 +23,6 @@ pub enum CancelError {
impl ReportableError for CancelError {
fn get_error_type(&self) -> crate::error::ErrorKind {
match self {
// not really user error, but :shrug:
// not really an error either... need to handle at some point to forward to other proxies
CancelError::KeyNotFound(_) => crate::error::ErrorKind::User,
CancelError::IO(_) => crate::error::ErrorKind::Compute,
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
}
Expand All @@ -38,11 +33,10 @@ impl CancelMap {
/// Cancel a running query for the corresponding connection.
pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> {
// NB: we should immediately release the lock after cloning the token.
let cancel_closure = self
.0
.get(&key)
.and_then(|x| x.clone())
.ok_or(CancelError::KeyNotFound(key))?;
let Some(cancel_closure) = self.0.get(&key).and_then(|x| x.clone()) else {
tracing::warn!("query cancellation key not found: {key}");
return Ok(());
};

info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query().await
Expand Down
9 changes: 4 additions & 5 deletions proxy/src/serverless/backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use async_trait::async_trait;
use tracing::info;

Expand All @@ -13,7 +12,7 @@ use crate::{
proxy::connect_compute::ConnectMechanism,
};

use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME};
use super::conn_pool::{poll_client, Client, ConnInfo, ConnPoolError, GlobalConnPool, APP_NAME};

pub struct PoolingBackend {
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
Expand Down Expand Up @@ -66,7 +65,7 @@ impl PoolingBackend {
conn_info: ConnInfo,
keys: ComputeCredentialKeys,
force_new: bool,
) -> anyhow::Result<Client<tokio_postgres::Client>> {
) -> Result<Client<tokio_postgres::Client>, ConnPoolError> {
let maybe_client = if !force_new {
info!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info).await?
Expand All @@ -90,7 +89,7 @@ impl PoolingBackend {
let mut node_info = backend
.wake_compute(ctx)
.await?
.context("missing cache entry from wake_compute")?;
.ok_or(ConnPoolError::NoComputeInfo)?;

match keys {
#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -124,7 +123,7 @@ struct TokioMechanism {
impl ConnectMechanism for TokioMechanism {
type Connection = Client<tokio_postgres::Client>;
type ConnectError = tokio_postgres::Error;
type Error = anyhow::Error;
type Error = ConnPoolError;

async fn connect_once(
&self,
Expand Down
25 changes: 23 additions & 2 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ use std::{
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use uuid::Uuid;

use crate::console::messages::MetricsAuxInfo;
use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{
auth::backend::ComputeUserInfo, context::RequestMonitoring, metrics::NUM_DB_CONNECTIONS_GAUGE,
auth::{backend::ComputeUserInfo, AuthError},
console::errors::{GetAuthInfoError, WakeComputeError},
context::RequestMonitoring,
metrics::NUM_DB_CONNECTIONS_GAUGE,
DbName, EndpointCacheKey, RoleName,
};

Expand Down Expand Up @@ -261,6 +265,23 @@ pub struct GlobalConnPoolOptions {
pub max_total_conns: usize,
}

#[derive(Debug, thiserror::Error)]
pub enum ConnPoolError {
#[error("pooled connection closed at inconsistent state")]
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<Uuid>),
#[error("could not connection to compute")]
ConnectionError(#[from] tokio_postgres::Error),

#[error("could not get auth info")]
GetAuthInfo(#[from] GetAuthInfoError),
#[error("user not authenticated")]
AuthError(#[from] AuthError),
#[error("wake_compute returned error")]
WakeCompute(#[from] WakeComputeError),
#[error("wake_compute returned nothing")]
NoComputeInfo,
}

impl<C: ClientInnerExt> GlobalConnPool<C> {
pub fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Expand Down Expand Up @@ -358,7 +379,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
self: &Arc<Self>,
ctx: &mut RequestMonitoring,
conn_info: &ConnInfo,
) -> anyhow::Result<Option<Client<C>>> {
) -> Result<Option<Client<C>>, ConnPoolError> {
let mut client: Option<ClientInner<C>> = None;

let endpoint_pool = self.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key());
Expand Down

0 comments on commit d7be26f

Please sign in to comment.