Skip to content

Commit

Permalink
perf(serverless,runtime,cli): use DashMap & with_capacity where possi…
Browse files Browse the repository at this point in the history
…ble (#663)

* perf(serverless,runtime,cli): use DashMap & with_capacity where possible

* perf(serverless): use ThreadRng instead of StdRng
  • Loading branch information
QuiiBz authored Mar 12, 2023
1 parent 76f334a commit 822db09
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 93 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-rivers-cough.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Use ThreadRng instead of creating a new StdRng every time
5 changes: 5 additions & 0 deletions .changeset/loud-seahorses-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Use DashMap instead of RwLock<HashMap>
6 changes: 6 additions & 0 deletions .changeset/swift-tables-grow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@lagon/cli': patch
'@lagon/runtime': patch
---

Create HashMap with_capacity to avoid reallocations
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion crates/runtime_http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,15 @@ impl Request {
}

pub async fn from_hyper(request: HyperRequest<Body>) -> Result<Self> {
let mut headers = HashMap::<String, Vec<String>>::new();
Self::from_hyper_with_capacity(request, 0).await
}

pub async fn from_hyper_with_capacity(
request: HyperRequest<Body>,
capacity: usize,
) -> Result<Self> {
let mut headers =
HashMap::<String, Vec<String>>::with_capacity(request.headers().keys_len() + capacity);

for (key, value) in request.headers().iter() {
if key != X_LAGON_ID {
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime_http/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ impl Response {
}

pub async fn from_hyper(response: HyperResponse<Body>) -> Result<Self> {
let mut headers = HashMap::<String, Vec<String>>::new();
let mut headers =
HashMap::<String, Vec<String>>::with_capacity(response.headers().keys_len());

for (key, value) in response.headers().iter() {
headers
Expand Down
2 changes: 1 addition & 1 deletion crates/runtime_utils/src/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn handle_asset(root: PathBuf, asset: &String) -> Result<Response> {
},
);

let mut headers = HashMap::new();
let mut headers = HashMap::with_capacity(1);
headers.insert("content-type".into(), vec![content_type.into()]);

Ok(Response {
Expand Down
5 changes: 3 additions & 2 deletions crates/runtime_v8_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ pub fn extract_v8_headers_object(
let map = unsafe { v8::Local::<v8::Map>::cast(value) };

if map.size() > 0 {
let mut headers = HashMap::new();
let headers_keys = map.as_array(scope);
let length = headers_keys.length();
let mut headers = HashMap::with_capacity((length / 2) as usize);

for mut index in 0..headers_keys.length() {
for mut index in 0..length {
if index % 2 != 0 {
continue;
}
Expand Down
1 change: 1 addition & 0 deletions crates/serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ rand = { version = "0.8.5", features = ["std_rng"] }
anyhow = "1.0.69"
tokio-cron-scheduler = "0.9.4"
uuid = "1.3.0"
dashmap = "5.4.0"

[build-dependencies]
lagon-runtime = { path = "../runtime" }
Expand Down
19 changes: 5 additions & 14 deletions crates/serverless/src/deployments/cache.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
use std::{
collections::HashMap,
env,
sync::Arc,
time::{Duration, Instant},
};

use tokio::sync::RwLock;
use dashmap::DashMap;

use crate::worker::Workers;

use super::pubsub::clear_deployment_cache;

const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(1);

pub fn run_cache_clear_task(
last_requests: Arc<RwLock<HashMap<String, Instant>>>,
workers: Workers,
) {
pub fn run_cache_clear_task(last_requests: Arc<DashMap<String, Instant>>, workers: Workers) {
let isolates_cache_seconds = Duration::from_secs(
env::var("LAGON_ISOLATES_CACHE_SECONDS")
.expect("LAGON_ISOLATES_CACHE_SECONDS is not set")
Expand All @@ -30,10 +26,11 @@ pub fn run_cache_clear_task(
loop {
tokio::time::sleep(CACHE_TASK_INTERVAL).await;

let last_requests_reader = last_requests.read().await;
let now = Instant::now();

for (deployment_id, last_request) in last_requests_reader.iter() {
for last_request in last_requests.iter() {
let (deployment_id, last_request) = last_request.pair();

if now.duration_since(*last_request) > isolates_cache_seconds {
deployments_to_clear.push(deployment_id.clone());
}
Expand All @@ -43,12 +40,6 @@ pub fn run_cache_clear_task(
continue;
}

// Drop the read lock because we now acquire a write lock
drop(last_requests_reader);

// Clear everything
let mut last_requests = last_requests.write().await;

for deployment_id in &deployments_to_clear {
last_requests.remove(deployment_id);

Expand Down
10 changes: 6 additions & 4 deletions crates/serverless/src/deployments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::{
};

use anyhow::{anyhow, Result};
use dashmap::DashMap;
use lagon_runtime_utils::Deployment;
use log::{error, info, warn};
use mysql::{prelude::Queryable, PooledConn};
use s3::Bucket;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use crate::{cronjob::Cronjob, REGION};

Expand All @@ -20,6 +21,8 @@ pub mod cache;
pub mod filesystem;
pub mod pubsub;

pub type Deployments = Arc<DashMap<String, Arc<Deployment>>>;

pub async fn download_deployment(deployment: &Deployment, bucket: &Bucket) -> Result<()> {
match bucket.get_object(deployment.id.clone() + ".js").await {
Ok(object) => {
Expand Down Expand Up @@ -65,8 +68,8 @@ pub async fn get_deployments(
mut conn: PooledConn,
bucket: Bucket,
cronjob: Arc<Mutex<Cronjob>>,
) -> Result<Arc<RwLock<HashMap<String, Arc<Deployment>>>>> {
let deployments = Arc::new(RwLock::new(HashMap::new()));
) -> Result<Deployments> {
let deployments = Arc::new(DashMap::new());

let mut deployments_list: HashMap<String, Deployment> = HashMap::new();

Expand Down Expand Up @@ -163,7 +166,6 @@ OR
}

{
let mut deployments = deployments.write().await;
let mut cronjob = cronjob.lock().await;

for deployment in deployments_list {
Expand Down
24 changes: 10 additions & 14 deletions crates/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@ use log::{error, info, warn};
use metrics::increment_counter;
use s3::Bucket;
use serde_json::Value;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::Mutex, task::JoinHandle};

use crate::{
cronjob::Cronjob,
worker::{WorkerEvent, Workers},
REGION,
};

use super::{download_deployment, filesystem::rm_deployment, Deployment};
use super::{download_deployment, filesystem::rm_deployment, Deployment, Deployments};

pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, reason: String) {
for (sender, _) in workers.read().await.values() {
for worker in workers.iter() {
let sender = &worker.0;

sender
.send_async(WorkerEvent::Drop {
deployment_id: deployment_id.clone(),
Expand All @@ -32,9 +31,9 @@ pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, rea

async fn run(
bucket: &Bucket,
deployments: &Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
deployments: Deployments,
workers: Workers,
cronjob: &Arc<Mutex<Cronjob>>,
cronjob: Arc<Mutex<Cronjob>>,
client: &redis::Client,
) -> Result<()> {
let mut conn = client.get_connection()?;
Expand Down Expand Up @@ -109,7 +108,6 @@ async fn run(
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();
let deployment = Arc::new(deployment);

Expand Down Expand Up @@ -159,7 +157,6 @@ async fn run(
"region" => REGION.clone(),
);

let mut deployments = deployments.write().await;
let domains = deployment.get_domains();

for domain in &domains {
Expand Down Expand Up @@ -202,7 +199,6 @@ async fn run(
);

let previous_id = value["previousDeploymentId"].as_str().unwrap();
let mut deployments = deployments.write().await;

if let Some(deployment) = deployments.get(previous_id) {
let mut unpromoted_deployment = deployment.as_ref().clone();
Expand Down Expand Up @@ -245,7 +241,7 @@ async fn run(

pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
deployments: Deployments,
workers: Workers,
cronjob: Arc<Mutex<Cronjob>>,
) -> JoinHandle<Result<()>> {
Expand All @@ -256,9 +252,9 @@ pub fn listen_pub_sub(
loop {
if let Err(error) = run(
&bucket,
&deployments,
Arc::clone(&deployments),
Arc::clone(&workers),
&cronjob,
Arc::clone(&cronjob),
&client,
)
.await
Expand Down
Loading

0 comments on commit 822db09

Please sign in to comment.