Skip to content

Commit

Permalink
refactor(serverless): concurrent requests (#616)
Browse files Browse the repository at this point in the history
* refactor(serverless): concurrent workers

* fix: clean

* fix: remove unwrap

* perf: create single vec in cache clear task

* feat: add LAGON_WORKERS env variable

* chore: add changesets
  • Loading branch information
QuiiBz authored Feb 25, 2023
1 parent 0e4e2b3 commit 16b0a43
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 218 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-pugs-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Add `LAGON_WORKERS` environment variable to customize the size of the worker's pool
5 changes: 5 additions & 0 deletions .changeset/many-carrots-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Allow concurrent requests of isolates
1 change: 1 addition & 0 deletions crates/serverless/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LAGON_ROOT_DOMAIN=lagon.dev
LAGON_REGION=
LAGON_ISOLATES_CACHE_SECONDS=60
LAGON_LISTEN_ADDR=0.0.0.0:4000
LAGON_WORKERS=4

DATABASE_URL=mysql://root:mysql@localhost:3306/lagon
REDIS_URL=redis://localhost:6379
Expand Down
31 changes: 20 additions & 11 deletions crates/serverless/src/deployments/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::{
};

use tokio::sync::RwLock;
use tokio_util::task::LocalPoolHandle;

use super::pubsub::clear_deployments_cache;
use crate::worker::Workers;

use super::pubsub::clear_deployment_cache;

const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(1);

pub fn run_cache_clear_task(
last_requests: Arc<RwLock<HashMap<String, Instant>>>,
pool: LocalPoolHandle,
workers: Workers,
) {
let isolates_cache_seconds = Duration::from_secs(
env::var("LAGON_ISOLATES_CACHE_SECONDS")
Expand All @@ -24,20 +25,21 @@ pub fn run_cache_clear_task(
);

tokio::spawn(async move {
let mut deployments_to_clear = Vec::new();

loop {
tokio::time::sleep(CACHE_TASK_INTERVAL).await;

let now = Instant::now();
let mut hostnames_to_clear = Vec::new();
let last_requests_reader = last_requests.read().await;
let now = Instant::now();

for (hostname, last_request) in last_requests_reader.iter() {
for (deployment_id, last_request) in last_requests_reader.iter() {
if now.duration_since(*last_request) > isolates_cache_seconds {
hostnames_to_clear.push(hostname.clone());
deployments_to_clear.push(deployment_id.clone());
}
}

if hostnames_to_clear.is_empty() {
if deployments_to_clear.is_empty() {
continue;
}

Expand All @@ -47,11 +49,18 @@ pub fn run_cache_clear_task(
// Clear everything
let mut last_requests = last_requests.write().await;

for hostname in &hostnames_to_clear {
last_requests.remove(hostname);
for deployment_id in &deployments_to_clear {
last_requests.remove(deployment_id);

clear_deployment_cache(
deployment_id.clone(),
Arc::clone(&workers),
String::from("expiration"),
)
.await;
}

clear_deployments_cache(hostnames_to_clear, &pool, "expiration").await;
deployments_to_clear.clear();
}
});
}
94 changes: 40 additions & 54 deletions crates/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,31 @@ use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio_util::task::LocalPoolHandle;

use crate::{cronjob::Cronjob, ISOLATES, POOL_SIZE, REGION};
use crate::{
cronjob::Cronjob,
worker::{WorkerEvent, Workers},
REGION,
};

use super::{download_deployment, filesystem::rm_deployment, Deployment};

// The isolate is implicitely dropped when the block after `remove()` ends
//
// An isolate must be dropped (which will call `exit()` and terminate the
// execution) in the same thread as it was created in
pub async fn clear_deployments_cache(
hostnames: Vec<String>,
pool: &LocalPoolHandle,
reason: &'static str,
) {
for thread_id in 0..POOL_SIZE {
let hostnames = hostnames.clone();

// Spawn early the task and loop the hostnames inside, instead
// of looping outside and spawning hostnames * threads tasks
match pool
.spawn_pinned_by_idx(
move || async move {
let mut thread_isolates = ISOLATES.write().await;

for hostname in hostnames {
// We might not have yet created the isolates map for this thread
if let Some(thread_isolates) = thread_isolates.get_mut(&thread_id) {
if let Some(isolate) = thread_isolates.remove(&hostname) {
let metadata = isolate.get_metadata();

if let Some((deployment, function)) = metadata.as_ref() {
info!(
deployment = deployment,
function = function,
hostname = hostname;
"Clearing deployment from cache due to {}",
reason
);
}
}
}
}
},
thread_id,
)
pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, reason: String) {
for (sender, _) in workers.read().await.values() {
sender
.send_async(WorkerEvent::Drop {
deployment_id: deployment_id.clone(),
reason: reason.clone(),
})
.await
{
Ok(_) => {}
Err(err) => {
error!("Failed to clear deployments from cache: {}", err);
}
};
.unwrap_or(());
}
}

async fn run(
bucket: &Bucket,
deployments: &Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: &LocalPoolHandle,
workers: Workers,
cronjob: &Arc<Mutex<Cronjob>>,
client: &redis::Client,
) -> Result<()> {
Expand Down Expand Up @@ -128,6 +93,8 @@ async fn run(
cron,
};

let workers = Arc::clone(&workers);

match channel {
"deploy" => {
match download_deployment(&deployment, bucket).await {
Expand All @@ -148,7 +115,12 @@ async fn run(
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

clear_deployments_cache(domains, pool, "deployment").await;
clear_deployment_cache(
deployment.id.clone(),
workers,
String::from("deployment"),
)
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand Down Expand Up @@ -192,7 +164,12 @@ async fn run(
deployments.remove(domain);
}

clear_deployments_cache(domains, pool, "undeployment").await;
clear_deployment_cache(
deployment.id.clone(),
workers,
String::from("undeployment"),
)
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand Down Expand Up @@ -247,7 +224,8 @@ async fn run(
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

clear_deployments_cache(domains, pool, "promotion").await;
clear_deployment_cache(deployment.id.clone(), workers, String::from("promotion"))
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand All @@ -266,15 +244,23 @@ async fn run(
pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: LocalPoolHandle,
workers: Workers,
cronjob: Arc<Mutex<Cronjob>>,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url)?;

loop {
if let Err(error) = run(&bucket, &deployments, &pool, &cronjob, &client).await {
if let Err(error) = run(
&bucket,
&deployments,
Arc::clone(&workers),
&cronjob,
&client,
)
.await
{
error!("Pub/sub error: {}", error);
}
}
Expand Down
Loading

0 comments on commit 16b0a43

Please sign in to comment.