Skip to content

Commit

Permalink
chore: replace dashmap with clashmap (#10582)
Browse files Browse the repository at this point in the history
## Problem

Because dashmap 6 switched to hashbrown RawTable API, it required us to
use unsafe code in the upgrade:
#8107

## Summary of changes

Switch to clashmap, a fork maintained by me which removes much of the
unsafe and ultimately switches to HashTable instead of RawTable to
remove much of the unsafe requirement on us.
  • Loading branch information
conradludgate authored Jan 31, 2025
1 parent 423e239 commit 738bf83
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 48 deletions.
53 changes: 52 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
dashmap = { version = "5.5.0", features = ["raw-api"] }
diatomic-waker = { version = "0.2.3" }
either = "1.8"
enum-map = "2.4.2"
Expand Down
2 changes: 1 addition & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
clashmap.workspace = true
compute_api.workspace = true
consumption_metrics.workspace = true
dashmap.workspace = true
env_logger.workspace = true
framed-websockets.workspace = true
futures.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions proxy/src/auth/backend/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use clashmap::ClashMap;
use jose_jwk::crypto::KeyInfo;
use reqwest::{redirect, Client};
use reqwest_retry::policies::ExponentialBackoff;
Expand Down Expand Up @@ -64,7 +64,7 @@ pub(crate) struct AuthRule {
pub struct JwkCache {
client: reqwest_middleware::ClientWithMiddleware,

map: DashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
map: ClashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
}

pub(crate) struct JwkCacheEntry {
Expand Down Expand Up @@ -469,7 +469,7 @@ impl Default for JwkCache {

JwkCache {
client,
map: DashMap::default(),
map: ClashMap::default(),
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions proxy/src/cache/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::future::pending;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};

use dashmap::DashSet;
use clashmap::ClashSet;
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, FromRedisValue, Value};
use serde::Deserialize;
Expand Down Expand Up @@ -55,9 +55,9 @@ impl TryFrom<&Value> for ControlPlaneEvent {

pub struct EndpointsCache {
config: EndpointCacheConfig,
endpoints: DashSet<EndpointIdInt>,
branches: DashSet<BranchIdInt>,
projects: DashSet<ProjectIdInt>,
endpoints: ClashSet<EndpointIdInt>,
branches: ClashSet<BranchIdInt>,
projects: ClashSet<ProjectIdInt>,
ready: AtomicBool,
limiter: Arc<Mutex<GlobalRateLimiter>>,
}
Expand All @@ -69,9 +69,9 @@ impl EndpointsCache {
config.limiter_info.clone(),
))),
config,
endpoints: DashSet::new(),
branches: DashSet::new(),
projects: DashSet::new(),
endpoints: ClashSet::new(),
branches: ClashSet::new(),
projects: ClashSet::new(),
ready: AtomicBool::new(false),
}
}
Expand Down
12 changes: 6 additions & 6 deletions proxy/src/cache/project_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use dashmap::DashMap;
use clashmap::ClashMap;
use rand::{thread_rng, Rng};
use smol_str::SmolStr;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -108,9 +108,9 @@ impl EndpointInfo {
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCacheImpl {
cache: DashMap<EndpointIdInt, EndpointInfo>,
cache: ClashMap<EndpointIdInt, EndpointInfo>,

project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
config: ProjectInfoCacheOptions,

start_time: Instant,
Expand Down Expand Up @@ -176,8 +176,8 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
impl ProjectInfoCacheImpl {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
Self {
cache: DashMap::new(),
project2ep: DashMap::new(),
cache: ClashMap::new(),
project2ep: ClashMap::new(),
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
Expand Down Expand Up @@ -302,7 +302,7 @@ impl ProjectInfoCacheImpl {
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for endpoint in endpoints.get() {
for endpoint in endpoints {
self.cache.remove(endpoint);
removed += 1;
}
Expand Down
8 changes: 3 additions & 5 deletions proxy/src/cancellation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

Expand All @@ -8,7 +9,7 @@ use pq_proto::CancelKeyData;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info};

use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
Expand All @@ -17,14 +18,11 @@ use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::CancelChannelSizeGuard;
use crate::metrics::{CancellationRequest, Metrics, RedisMsgKind};
use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind};
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::RedisKVClient;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use std::convert::Infallible;
use tokio::sync::oneshot;

type IpSubnetKey = IpNet;

Expand Down
8 changes: 4 additions & 4 deletions proxy/src/control_plane/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use clashmap::ClashMap;
use tokio::time::Instant;
use tracing::{debug, info};

Expand Down Expand Up @@ -148,7 +148,7 @@ impl ApiCaches {
/// Various caches for [`control_plane`](super).
pub struct ApiLocks<K> {
name: &'static str,
node_locks: DashMap<K, Arc<DynamicLimiter>>,
node_locks: ClashMap<K, Arc<DynamicLimiter>>,
config: RateLimiterConfig,
timeout: Duration,
epoch: std::time::Duration,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
) -> prometheus::Result<Self> {
Ok(Self {
name,
node_locks: DashMap::with_shard_amount(shards),
node_locks: ClashMap::with_shard_amount(shards),
config,
timeout,
epoch,
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
let mut lock = shard.write();
let timer = self.metrics.reclamation_lag_seconds.start_timer();
let count = lock
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
.extract_if(|(_, semaphore)| Arc::strong_count(semaphore) == 1)
.count();
drop(lock);
self.metrics.semaphores_unregistered.inc_by(count as u64);
Expand Down
8 changes: 4 additions & 4 deletions proxy/src/rate_limiter/leaky_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};

use ahash::RandomState;
use dashmap::DashMap;
use clashmap::ClashMap;
use rand::{thread_rng, Rng};
use tokio::time::Instant;
use tracing::info;
Expand All @@ -14,7 +14,7 @@ use crate::intern::EndpointIdInt;
pub type EndpointRateLimiter = LeakyBucketRateLimiter<EndpointIdInt>;

pub struct LeakyBucketRateLimiter<Key> {
map: DashMap<Key, LeakyBucketState, RandomState>,
map: ClashMap<Key, LeakyBucketState, RandomState>,
config: utils::leaky_bucket::LeakyBucketConfig,
access_count: AtomicUsize,
}
Expand All @@ -27,7 +27,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {

pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self {
Self {
map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
map: ClashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
config: config.into(),
access_count: AtomicUsize::new(0),
}
Expand Down Expand Up @@ -58,7 +58,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let shard = thread_rng().gen_range(0..n);
self.map.shards()[shard]
.write()
.retain(|_, value| !value.get().bucket_is_empty(now));
.retain(|(_, value)| !value.bucket_is_empty(now));
}
}

Expand Down
6 changes: 3 additions & 3 deletions proxy/src/rate_limiter/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;

use anyhow::bail;
use dashmap::DashMap;
use clashmap::ClashMap;
use itertools::Itertools;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -62,7 +62,7 @@ impl GlobalRateLimiter {
pub type WakeComputeRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;

pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
map: ClashMap<Key, Vec<RateBucket>, Hasher>,
info: Cow<'static, [RateBucketInfo]>,
access_count: AtomicUsize,
rand: Mutex<Rand>,
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
info!(buckets = ?info, "endpoint rate limiter");
Self {
info,
map: DashMap::with_hasher_and_shard_amount(hasher, 64),
map: ClashMap::with_hasher_and_shard_amount(hasher, 64),
access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request
rand: Mutex::new(rand),
}
Expand Down
3 changes: 2 additions & 1 deletion proxy/src/redis/keys.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::io::ErrorKind;

use anyhow::Ok;
use pq_proto::{id_to_cancel_key, CancelKeyData};
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;

pub mod keyspace {
pub const CANCEL_PREFIX: &str = "cancel";
Expand Down
1 change: 0 additions & 1 deletion proxy/src/redis/kv_ops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use redis::{AsyncCommands, ToRedisArgs};

use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;

use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};

pub struct RedisKVClient {
Expand Down
Loading

1 comment on commit 738bf83

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7565 tests run: 7203 passed, 1 failed, 361 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_download_churn[release-pg16-github-actions-selfhosted-100-tokio-epoll-uring-30]"
Flaky tests (2)

Postgres 17

Postgres 15

Code coverage* (full report)

  • functions: 33.3% (8515 of 25538 functions)
  • lines: 49.1% (71511 of 145568 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
738bf83 at 2025-01-31T12:19:16.839Z :recycle:

Please sign in to comment.