Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(serverless): concurrent requests #616

Merged
merged 6 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/late-pugs-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Add `LAGON_WORKERS` environment variable to customize the size of the worker's pool
5 changes: 5 additions & 0 deletions .changeset/many-carrots-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Allow concurrent requests of isolates
1 change: 1 addition & 0 deletions crates/serverless/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LAGON_ROOT_DOMAIN=lagon.dev
LAGON_REGION=
LAGON_ISOLATES_CACHE_SECONDS=60
LAGON_LISTEN_ADDR=0.0.0.0:4000
LAGON_WORKERS=4

DATABASE_URL=mysql://root:mysql@localhost:3306/lagon
REDIS_URL=redis://localhost:6379
Expand Down
31 changes: 20 additions & 11 deletions crates/serverless/src/deployments/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::{
};

use tokio::sync::RwLock;
use tokio_util::task::LocalPoolHandle;

use super::pubsub::clear_deployments_cache;
use crate::worker::Workers;

use super::pubsub::clear_deployment_cache;

const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(1);

pub fn run_cache_clear_task(
last_requests: Arc<RwLock<HashMap<String, Instant>>>,
pool: LocalPoolHandle,
workers: Workers,
) {
let isolates_cache_seconds = Duration::from_secs(
env::var("LAGON_ISOLATES_CACHE_SECONDS")
Expand All @@ -24,20 +25,21 @@ pub fn run_cache_clear_task(
);

tokio::spawn(async move {
let mut deployments_to_clear = Vec::new();

loop {
tokio::time::sleep(CACHE_TASK_INTERVAL).await;

let now = Instant::now();
let mut hostnames_to_clear = Vec::new();
let last_requests_reader = last_requests.read().await;
let now = Instant::now();

for (hostname, last_request) in last_requests_reader.iter() {
for (deployment_id, last_request) in last_requests_reader.iter() {
if now.duration_since(*last_request) > isolates_cache_seconds {
hostnames_to_clear.push(hostname.clone());
deployments_to_clear.push(deployment_id.clone());
}
}

if hostnames_to_clear.is_empty() {
if deployments_to_clear.is_empty() {
continue;
}

Expand All @@ -47,11 +49,18 @@ pub fn run_cache_clear_task(
// Clear everything
let mut last_requests = last_requests.write().await;

for hostname in &hostnames_to_clear {
last_requests.remove(hostname);
for deployment_id in &deployments_to_clear {
last_requests.remove(deployment_id);

clear_deployment_cache(
deployment_id.clone(),
Arc::clone(&workers),
String::from("expiration"),
)
.await;
}

clear_deployments_cache(hostnames_to_clear, &pool, "expiration").await;
deployments_to_clear.clear();
}
});
}
94 changes: 40 additions & 54 deletions crates/serverless/src/deployments/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,31 @@ use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio_util::task::LocalPoolHandle;

use crate::{cronjob::Cronjob, ISOLATES, POOL_SIZE, REGION};
use crate::{
cronjob::Cronjob,
worker::{WorkerEvent, Workers},
REGION,
};

use super::{download_deployment, filesystem::rm_deployment, Deployment};

// The isolate is implicitely dropped when the block after `remove()` ends
//
// An isolate must be dropped (which will call `exit()` and terminate the
// execution) in the same thread as it was created in
pub async fn clear_deployments_cache(
hostnames: Vec<String>,
pool: &LocalPoolHandle,
reason: &'static str,
) {
for thread_id in 0..POOL_SIZE {
let hostnames = hostnames.clone();

// Spawn early the task and loop the hostnames inside, instead
// of looping outside and spawning hostnames * threads tasks
match pool
.spawn_pinned_by_idx(
move || async move {
let mut thread_isolates = ISOLATES.write().await;

for hostname in hostnames {
// We might not have yet created the isolates map for this thread
if let Some(thread_isolates) = thread_isolates.get_mut(&thread_id) {
if let Some(isolate) = thread_isolates.remove(&hostname) {
let metadata = isolate.get_metadata();

if let Some((deployment, function)) = metadata.as_ref() {
info!(
deployment = deployment,
function = function,
hostname = hostname;
"Clearing deployment from cache due to {}",
reason
);
}
}
}
}
},
thread_id,
)
pub async fn clear_deployment_cache(deployment_id: String, workers: Workers, reason: String) {
for (sender, _) in workers.read().await.values() {
sender
.send_async(WorkerEvent::Drop {
deployment_id: deployment_id.clone(),
reason: reason.clone(),
})
.await
{
Ok(_) => {}
Err(err) => {
error!("Failed to clear deployments from cache: {}", err);
}
};
.unwrap_or(());
}
}

async fn run(
bucket: &Bucket,
deployments: &Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: &LocalPoolHandle,
workers: Workers,
cronjob: &Arc<Mutex<Cronjob>>,
client: &redis::Client,
) -> Result<()> {
Expand Down Expand Up @@ -128,6 +93,8 @@ async fn run(
cron,
};

let workers = Arc::clone(&workers);

match channel {
"deploy" => {
match download_deployment(&deployment, bucket).await {
Expand All @@ -148,7 +115,12 @@ async fn run(
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

clear_deployments_cache(domains, pool, "deployment").await;
clear_deployment_cache(
deployment.id.clone(),
workers,
String::from("deployment"),
)
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand Down Expand Up @@ -192,7 +164,12 @@ async fn run(
deployments.remove(domain);
}

clear_deployments_cache(domains, pool, "undeployment").await;
clear_deployment_cache(
deployment.id.clone(),
workers,
String::from("undeployment"),
)
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand Down Expand Up @@ -247,7 +224,8 @@ async fn run(
deployments.insert(domain.clone(), Arc::clone(&deployment));
}

clear_deployments_cache(domains, pool, "promotion").await;
clear_deployment_cache(deployment.id.clone(), workers, String::from("promotion"))
.await;

if deployment.should_run_cron() {
let mut cronjob = cronjob.lock().await;
Expand All @@ -266,15 +244,23 @@ async fn run(
pub fn listen_pub_sub(
bucket: Bucket,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
pool: LocalPoolHandle,
workers: Workers,
cronjob: Arc<Mutex<Cronjob>>,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let client = redis::Client::open(url)?;

loop {
if let Err(error) = run(&bucket, &deployments, &pool, &cronjob, &client).await {
if let Err(error) = run(
&bucket,
&deployments,
Arc::clone(&workers),
&cronjob,
&client,
)
.await
{
error!("Pub/sub error: {}", error);
}
}
Expand Down
Loading