From 7f5a17bc2f8cb12f8882a0bea65dceba9147ef30 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 7 Feb 2024 16:09:03 +0000 Subject: [PATCH] conn pool strong typed errors --- proxy/src/cancellation.rs | 14 ++++---------- proxy/src/serverless/backend.rs | 9 ++++----- proxy/src/serverless/conn_pool.rs | 31 +++++++++++++++++++++++++++++-- 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index d74e02eddda96..45283977f103e 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -14,8 +14,6 @@ pub struct CancelMap(DashMap>); #[derive(Debug, Error)] pub enum CancelError { - #[error("query cancellation key not found: {0}")] - KeyNotFound(CancelKeyData), #[error("{0}")] IO(#[from] std::io::Error), #[error("{0}")] @@ -25,9 +23,6 @@ pub enum CancelError { impl ReportableError for CancelError { fn get_error_type(&self) -> crate::error::ErrorKind { match self { - // not really user error, but :shrug: - // not really an error either... need to handle at some point to forward to other proxies - CancelError::KeyNotFound(_) => crate::error::ErrorKind::User, CancelError::IO(_) => crate::error::ErrorKind::Compute, CancelError::Postgres(_) => crate::error::ErrorKind::Compute, } @@ -38,11 +33,10 @@ impl CancelMap { /// Cancel a running query for the corresponding connection. pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> { // NB: we should immediately release the lock after cloning the token. - let cancel_closure = self - .0 - .get(&key) - .and_then(|x| x.clone()) - .ok_or(CancelError::KeyNotFound(key))?; + let Some(cancel_closure) = self.0.get(&key).and_then(|x| x.clone()) else { + tracing::warn!("query cancellation key not found: {key}"); + return Ok(()); + }; info!("cancelling query per user's request using key {key}"); cancel_closure.try_cancel_query().await diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 466a74f0ea757..4940a00e96d38 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -1,6 +1,5 @@ use std::{sync::Arc, time::Duration}; -use anyhow::Context; use async_trait::async_trait; use tracing::info; @@ -13,7 +12,7 @@ use crate::{ proxy::connect_compute::ConnectMechanism, }; -use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME}; +use super::conn_pool::{poll_client, Client, ConnInfo, ConnPoolError, GlobalConnPool, APP_NAME}; pub struct PoolingBackend { pub pool: Arc>, @@ -66,7 +65,7 @@ impl PoolingBackend { conn_info: ConnInfo, keys: ComputeCredentialKeys, force_new: bool, - ) -> anyhow::Result> { + ) -> Result, ConnPoolError> { let maybe_client = if !force_new { info!("pool: looking for an existing connection"); self.pool.get(ctx, &conn_info).await? @@ -90,7 +89,7 @@ impl PoolingBackend { let mut node_info = backend .wake_compute(ctx) .await? - .context("missing cache entry from wake_compute")?; + .ok_or(ConnPoolError::NoComputeInfo)?; match keys { #[cfg(any(test, feature = "testing"))] @@ -124,7 +123,7 @@ struct TokioMechanism { impl ConnectMechanism for TokioMechanism { type Connection = Client; type ConnectError = tokio_postgres::Error; - type Error = anyhow::Error; + type Error = ConnPoolError; async fn connect_once( &self, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index a7b2c532d2ce7..ae24623535d28 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -16,12 +16,16 @@ use std::{ use tokio::time::Instant; use tokio_postgres::tls::NoTlsStream; use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket}; +use uuid::Uuid; use crate::console::messages::MetricsAuxInfo; use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL}; use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; use crate::{ - auth::backend::ComputeUserInfo, context::RequestMonitoring, metrics::NUM_DB_CONNECTIONS_GAUGE, + auth::{backend::ComputeUserInfo, AuthError}, + console::errors::{GetAuthInfoError, WakeComputeError}, + context::RequestMonitoring, + metrics::NUM_DB_CONNECTIONS_GAUGE, DbName, EndpointCacheKey, RoleName, }; @@ -261,6 +265,29 @@ pub struct GlobalConnPoolOptions { pub max_total_conns: usize, } +#[derive(Debug, thiserror::Error)] +pub enum ConnPoolError { + #[error("pooled connection closed at inconsistent state")] + ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError), + #[error("could not connection to compute")] + ConnectionError(#[from] tokio_postgres::Error), + + #[error("could not get auth info")] + GetAuthInfo(#[from] GetAuthInfoError), + #[error("user not authenticated")] + AuthError(#[from] AuthError), + #[error("wake_compute returned error")] + WakeCompute(#[from] WakeComputeError), + #[error("wake_compute returned nothing")] + NoComputeInfo, + + // this code will go shortly :) + #[error("could not hash password")] + HashError(#[from] pbkdf2::password_hash::Error), + #[error("task panicked")] + InternalError(#[from] tokio::task::JoinError), +} + impl GlobalConnPool { pub fn new(config: &'static crate::config::HttpConfig) -> Arc { let shards = config.pool_options.pool_shards; @@ -358,7 +385,7 @@ impl GlobalConnPool { self: &Arc, ctx: &mut RequestMonitoring, conn_info: &ConnInfo, - ) -> anyhow::Result>> { + ) -> Result>, ConnPoolError> { let mut client: Option> = None; let endpoint_pool = self.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key());