From 822db09957b439cf548dd5bac85e7325e6a468c8 Mon Sep 17 00:00:00 2001 From: Tom Lienard Date: Sun, 12 Mar 2023 12:10:20 +0100 Subject: [PATCH] perf(serverless,runtime,cli): use DashMap & with_capacity where possible (#663) * perf(serverless,runtime,cli): use DashMap & with_capacity where possible * perf(serverless): use ThreadRng instead of StdRng --- .changeset/large-rivers-cough.md | 5 +++ .changeset/loud-seahorses-yell.md | 5 +++ .changeset/swift-tables-grow.md | 6 +++ Cargo.lock | 14 ++++--- crates/runtime_http/src/request.rs | 10 ++++- crates/runtime_http/src/response.rs | 3 +- crates/runtime_utils/src/assets.rs | 2 +- crates/runtime_v8_utils/src/lib.rs | 5 ++- crates/serverless/Cargo.toml | 1 + crates/serverless/src/deployments/cache.rs | 19 +++------- crates/serverless/src/deployments/mod.rs | 10 +++-- crates/serverless/src/deployments/pubsub.rs | 24 +++++------- crates/serverless/src/main.rs | 36 +++++++----------- crates/serverless/src/worker.rs | 41 +++++++-------------- 14 files changed, 88 insertions(+), 93 deletions(-) create mode 100644 .changeset/large-rivers-cough.md create mode 100644 .changeset/loud-seahorses-yell.md create mode 100644 .changeset/swift-tables-grow.md diff --git a/.changeset/large-rivers-cough.md b/.changeset/large-rivers-cough.md new file mode 100644 index 000000000..76e751aa3 --- /dev/null +++ b/.changeset/large-rivers-cough.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Use ThreadRng instead of creating a new StdRng every time diff --git a/.changeset/loud-seahorses-yell.md b/.changeset/loud-seahorses-yell.md new file mode 100644 index 000000000..5565c652b --- /dev/null +++ b/.changeset/loud-seahorses-yell.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Use DashMap instead of RwLock diff --git a/.changeset/swift-tables-grow.md b/.changeset/swift-tables-grow.md new file mode 100644 index 000000000..4fb077410 --- /dev/null +++ b/.changeset/swift-tables-grow.md @@ -0,0 +1,6 @@ +--- +'@lagon/cli': patch +'@lagon/runtime': patch +--- + +Create HashMap with_capacity to avoid reallocations diff --git a/Cargo.lock b/Cargo.lock index 3458acb70..79cb246ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -610,13 +610,14 @@ checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" [[package]] name = "dashmap" -version = "5.3.4" +version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if", "hashbrown", "lock_api", + "once_cell", "parking_lot_core", ] @@ -1493,6 +1494,7 @@ name = "lagon-serverless" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "dotenv", "flume", "hyper", @@ -1662,9 +1664,9 @@ checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" [[package]] name = "lock_api" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" dependencies = [ "autocfg", "scopeguard", @@ -2076,9 +2078,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.13.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "opaque-debug" diff --git a/crates/runtime_http/src/request.rs b/crates/runtime_http/src/request.rs index 570bf75fa..749667272 100644 --- a/crates/runtime_http/src/request.rs +++ b/crates/runtime_http/src/request.rs @@ -158,7 +158,15 @@ impl Request { } pub async fn from_hyper(request: HyperRequest) -> Result { - let mut headers = HashMap::>::new(); + Self::from_hyper_with_capacity(request, 0).await + } + + pub async fn from_hyper_with_capacity( + request: HyperRequest, + capacity: usize, + ) -> Result { + let mut headers = + HashMap::>::with_capacity(request.headers().keys_len() + capacity); for (key, value) in request.headers().iter() { if key != X_LAGON_ID { diff --git a/crates/runtime_http/src/response.rs b/crates/runtime_http/src/response.rs index 6c212af09..e3f00e72a 100644 --- a/crates/runtime_http/src/response.rs +++ b/crates/runtime_http/src/response.rs @@ -159,7 +159,8 @@ impl Response { } pub async fn from_hyper(response: HyperResponse) -> Result { - let mut headers = HashMap::>::new(); + let mut headers = + HashMap::>::with_capacity(response.headers().keys_len()); for (key, value) in response.headers().iter() { headers diff --git a/crates/runtime_utils/src/assets.rs b/crates/runtime_utils/src/assets.rs index 4051d5c99..48db4df74 100644 --- a/crates/runtime_utils/src/assets.rs +++ b/crates/runtime_utils/src/assets.rs @@ -39,7 +39,7 @@ pub fn handle_asset(root: PathBuf, asset: &String) -> Result { }, ); - let mut headers = HashMap::new(); + let mut headers = HashMap::with_capacity(1); headers.insert("content-type".into(), vec![content_type.into()]); Ok(Response { diff --git a/crates/runtime_v8_utils/src/lib.rs b/crates/runtime_v8_utils/src/lib.rs index 57fbf868b..6e256e6d9 100644 --- a/crates/runtime_v8_utils/src/lib.rs +++ b/crates/runtime_v8_utils/src/lib.rs @@ -32,10 +32,11 @@ pub fn extract_v8_headers_object( let map = unsafe { v8::Local::::cast(value) }; if map.size() > 0 { - let mut headers = HashMap::new(); let headers_keys = map.as_array(scope); + let length = headers_keys.length(); + let mut headers = HashMap::with_capacity((length / 2) as usize); - for mut index in 0..headers_keys.length() { + for mut index in 0..length { if index % 2 != 0 { continue; } diff --git a/crates/serverless/Cargo.toml b/crates/serverless/Cargo.toml index c08842a05..7ee875a37 100644 --- a/crates/serverless/Cargo.toml +++ b/crates/serverless/Cargo.toml @@ -26,6 +26,7 @@ rand = { version = "0.8.5", features = ["std_rng"] } anyhow = "1.0.69" tokio-cron-scheduler = "0.9.4" uuid = "1.3.0" +dashmap = "5.4.0" [build-dependencies] lagon-runtime = { path = "../runtime" } diff --git a/crates/serverless/src/deployments/cache.rs b/crates/serverless/src/deployments/cache.rs index 1534ff084..98281c1c3 100644 --- a/crates/serverless/src/deployments/cache.rs +++ b/crates/serverless/src/deployments/cache.rs @@ -1,11 +1,10 @@ use std::{ - collections::HashMap, env, sync::Arc, time::{Duration, Instant}, }; -use tokio::sync::RwLock; +use dashmap::DashMap; use crate::worker::Workers; @@ -13,10 +12,7 @@ use super::pubsub::clear_deployment_cache; const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(1); -pub fn run_cache_clear_task( - last_requests: Arc>>, - workers: Workers, -) { +pub fn run_cache_clear_task(last_requests: Arc>, workers: Workers) { let isolates_cache_seconds = Duration::from_secs( env::var("LAGON_ISOLATES_CACHE_SECONDS") .expect("LAGON_ISOLATES_CACHE_SECONDS is not set") @@ -30,10 +26,11 @@ pub fn run_cache_clear_task( loop { tokio::time::sleep(CACHE_TASK_INTERVAL).await; - let last_requests_reader = last_requests.read().await; let now = Instant::now(); - for (deployment_id, last_request) in last_requests_reader.iter() { + for last_request in last_requests.iter() { + let (deployment_id, last_request) = last_request.pair(); + if now.duration_since(*last_request) > isolates_cache_seconds { deployments_to_clear.push(deployment_id.clone()); } @@ -43,12 +40,6 @@ pub fn run_cache_clear_task( continue; } - // Drop the read lock because we now acquire a write lock - drop(last_requests_reader); - - // Clear everything - let mut last_requests = last_requests.write().await; - for deployment_id in &deployments_to_clear { last_requests.remove(deployment_id); diff --git a/crates/serverless/src/deployments/mod.rs b/crates/serverless/src/deployments/mod.rs index 4b0acba43..6b56c6325 100644 --- a/crates/serverless/src/deployments/mod.rs +++ b/crates/serverless/src/deployments/mod.rs @@ -6,11 +6,12 @@ use std::{ }; use anyhow::{anyhow, Result}; +use dashmap::DashMap; use lagon_runtime_utils::Deployment; use log::{error, info, warn}; use mysql::{prelude::Queryable, PooledConn}; use s3::Bucket; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; use crate::{cronjob::Cronjob, REGION}; @@ -20,6 +21,8 @@ pub mod cache; pub mod filesystem; pub mod pubsub; +pub type Deployments = Arc>>; + pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> Result<()> { match bucket.get_object(deployment.id.clone() + ".js").await { Ok(object) => { @@ -65,8 +68,8 @@ pub async fn get_deployments( mut conn: PooledConn, bucket: Bucket, cronjob: Arc>, -) -> Result>>>> { - let deployments = Arc::new(RwLock::new(HashMap::new())); +) -> Result { + let deployments = Arc::new(DashMap::new()); let mut deployments_list: HashMap = HashMap::new(); @@ -163,7 +166,6 @@ OR } { - let mut deployments = deployments.write().await; let mut cronjob = cronjob.lock().await; for deployment in deployments_list { diff --git a/crates/serverless/src/deployments/pubsub.rs b/crates/serverless/src/deployments/pubsub.rs index ac2b02851..924a7dc27 100644 --- a/crates/serverless/src/deployments/pubsub.rs +++ b/crates/serverless/src/deployments/pubsub.rs @@ -5,10 +5,7 @@ use log::{error, info, warn}; use metrics::increment_counter; use s3::Bucket; use serde_json::Value; -use tokio::{ - sync::{Mutex, RwLock}, - task::JoinHandle, -}; +use tokio::{sync::Mutex, task::JoinHandle}; use crate::{ cronjob::Cronjob, @@ -16,10 +13,12 @@ use crate::{ REGION, }; -use super::{download_deployment, filesystem::rm_deployment, Deployment}; +use super::{download_deployment, filesystem::rm_deployment, Deployment, Deployments}; pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, reason: String) { - for (sender, _) in workers.read().await.values() { + for worker in workers.iter() { + let sender = &worker.0; + sender .send_async(WorkerEvent::Drop { deployment_id: deployment_id.clone(), @@ -32,9 +31,9 @@ pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, rea async fn run( bucket: &Bucket, - deployments: &Arc>>>, + deployments: Deployments, workers: Workers, - cronjob: &Arc>, + cronjob: Arc>, client: &redis::Client, ) -> Result<()> { let mut conn = client.get_connection()?; @@ -109,7 +108,6 @@ async fn run( "region" => REGION.clone(), ); - let mut deployments = deployments.write().await; let domains = deployment.get_domains(); let deployment = Arc::new(deployment); @@ -159,7 +157,6 @@ async fn run( "region" => REGION.clone(), ); - let mut deployments = deployments.write().await; let domains = deployment.get_domains(); for domain in &domains { @@ -202,7 +199,6 @@ async fn run( ); let previous_id = value["previousDeploymentId"].as_str().unwrap(); - let mut deployments = deployments.write().await; if let Some(deployment) = deployments.get(previous_id) { let mut unpromoted_deployment = deployment.as_ref().clone(); @@ -245,7 +241,7 @@ async fn run( pub fn listen_pub_sub( bucket: Bucket, - deployments: Arc>>>, + deployments: Deployments, workers: Workers, cronjob: Arc>, ) -> JoinHandle> { @@ -256,9 +252,9 @@ pub fn listen_pub_sub( loop { if let Err(error) = run( &bucket, - &deployments, + Arc::clone(&deployments), Arc::clone(&workers), - &cronjob, + Arc::clone(&cronjob), &client, ) .await diff --git a/crates/serverless/src/main.rs b/crates/serverless/src/main.rs index 3716c3f3c..9e413663e 100644 --- a/crates/serverless/src/main.rs +++ b/crates/serverless/src/main.rs @@ -1,6 +1,8 @@ use anyhow::Result; use cronjob::Cronjob; +use dashmap::DashMap; use deployments::cache::run_cache_clear_task; +use deployments::Deployments; use hyper::header::HOST; use hyper::http::response::Builder; use hyper::server::conn::AddrStream; @@ -11,11 +13,8 @@ use lagon_runtime_http::{ Request, Response, RunResult, X_FORWARDED_FOR, X_LAGON_ID, X_LAGON_REGION, X_REAL_IP, }; use lagon_runtime_isolate::CONSOLE_SOURCE; +use lagon_runtime_utils::assets::{find_asset, handle_asset}; use lagon_runtime_utils::response::{handle_response, ResponseEvent, FAVICON_URL, PAGE_404}; -use lagon_runtime_utils::{ - assets::{find_asset, handle_asset}, - Deployment, -}; use lagon_serverless_logger::init_logger; use lazy_static::lazy_static; use log::{as_debug, error, info, warn}; @@ -28,14 +27,13 @@ use s3::creds::Credentials; use s3::Bucket; #[cfg(not(debug_assertions))] use std::borrow::Cow; -use std::collections::HashMap; use std::convert::Infallible; use std::env; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::Instant; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; use worker::{create_workers, start_workers, Workers}; use crate::deployments::get_deployments; @@ -82,9 +80,9 @@ fn handle_error( async fn handle_request( req: HyperRequest, ip: String, - deployments: Arc>>>, - thread_ids: Arc>>, - last_requests: Arc>>, + deployments: Deployments, + thread_ids: Arc>, + last_requests: Arc>, workers: Workers, ) -> Result> { let url = req.uri().to_string(); @@ -108,9 +106,8 @@ async fn handle_request( } }; - let deployments = deployments.read().await; let deployment = match deployments.get(&hostname) { - Some(deployment) => Arc::clone(deployment), + Some(entry) => Arc::clone(entry.value()), None => { increment_counter!( "lagon_ignored_requests", @@ -175,14 +172,11 @@ async fn handle_request( .await .unwrap_or(()); } else { - last_requests - .write() - .await - .insert(deployment_id.clone(), Instant::now()); + last_requests.insert(deployment_id.clone(), Instant::now()); increment_counter!("lagon_isolate_requests", &labels); - match Request::from_hyper(req).await { + match Request::from_hyper_with_capacity(req, 2).await { Ok(mut request) => { counter!("lagon_bytes_in", request.len() as u64, &labels); @@ -200,9 +194,7 @@ async fn handle_request( request.set_header(X_FORWARDED_FOR.to_string(), ip); request.set_header(X_LAGON_REGION.to_string(), REGION.to_string()); - let thread_id = get_thread_id(thread_ids, &hostname).await; - - let workers = workers.read().await; + let thread_id = get_thread_id(thread_ids, hostname); let worker = workers.get(&thread_id).unwrap(); worker @@ -313,10 +305,10 @@ async fn main() -> Result<()> { let cronjob = Arc::new(Mutex::new(Cronjob::new().await)); let deployments = get_deployments(conn, bucket.clone(), Arc::clone(&cronjob)).await?; - let last_requests = Arc::new(RwLock::new(HashMap::new())); - let thread_ids = Arc::new(RwLock::new(HashMap::new())); + let last_requests = Arc::new(DashMap::new()); + let thread_ids = Arc::new(DashMap::new()); - let workers = create_workers().await; + let workers = create_workers(); start_workers(Arc::clone(&workers)).await; let redis = listen_pub_sub( diff --git a/crates/serverless/src/worker.rs b/crates/serverless/src/worker.rs index c234c6fcf..d9e74023e 100644 --- a/crates/serverless/src/worker.rs +++ b/crates/serverless/src/worker.rs @@ -1,18 +1,17 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use dashmap::DashMap; use lagon_runtime_http::{Request, RunResult}; use lagon_runtime_isolate::{options::IsolateOptions, Isolate}; use lagon_runtime_utils::Deployment; use log::{error, info}; use metrics::{decrement_gauge, histogram, increment_gauge}; -use rand::{Rng, SeedableRng}; -use tokio::sync::RwLock; +use rand::{thread_rng, Rng}; use tokio_util::task::LocalPoolHandle; use crate::{REGION, SNAPSHOT_BLOB, WORKERS}; -pub type Workers = - Arc, flume::Receiver)>>>; +pub type Workers = Arc, flume::Receiver)>>; pub enum WorkerEvent { Request { @@ -28,43 +27,29 @@ pub enum WorkerEvent { }, } -pub async fn create_workers() -> Workers { - let workers = Arc::new(RwLock::new(HashMap::new())); +pub fn create_workers() -> Workers { + let workers = Arc::new(DashMap::new()); for i in 0..*WORKERS { let worker = flume::unbounded(); - workers.write().await.insert(i, worker); + workers.insert(i, worker); } workers } -pub async fn get_thread_id( - thread_ids: Arc>>, - hostname: &String, -) -> usize { - let thread_ids_reader = thread_ids.read().await; - - let thread_id = match thread_ids_reader.get(hostname) { - Some(thread_id) => *thread_id, - None => { - let mut rng = rand::rngs::StdRng::from_entropy(); - let id = rng.gen_range(0..*WORKERS); - - drop(thread_ids_reader); - - thread_ids.write().await.insert(hostname.clone(), id); - id - } - }; - - thread_id +pub fn get_thread_id(thread_ids: Arc>, hostname: String) -> usize { + *thread_ids + .entry(hostname) + .or_insert_with(|| thread_rng().gen_range(0..*WORKERS)) + .value() } pub async fn start_workers(workers: Workers) { let pool = LocalPoolHandle::new(*WORKERS); - for (id, worker) in workers.read().await.iter() { + for worker in workers.iter() { + let (id, worker) = worker.pair(); let id = *id; let receiver = worker.1.clone();