Skip to content

Commit

Permalink
feat(serverless): properly clear isolates cache (#264)
Browse files Browse the repository at this point in the history
* feat(serverless): properly clear isolates cache

* fix: remove flamegraph

* fix: useless comment
  • Loading branch information
QuiiBz authored Nov 18, 2022
1 parent 6db8e71 commit e970b9d
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/hip-boxes-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Properly clear isolates cache after the configured seconds and no requests
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/runtime/src/isolate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ impl<T: Clone> Isolate<T> {
this
}

pub fn get_metadata(&self) -> Option<T> {
self.options.metadata.clone()
}

fn heap_limit_reached(&mut self) {
self.termination_tx
.as_ref()
Expand All @@ -244,7 +248,7 @@ impl<T: Clone> Isolate<T> {
s.clone()
}

pub fn evaluate(&mut self, request: Request) -> Option<String> {
fn evaluate(&mut self, request: Request) -> Option<String> {
let isolate_state = Isolate::<T>::state(&self.isolate);

// Reset the stream status after each `run()`
Expand Down
1 change: 0 additions & 1 deletion packages/serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ axiom-rs = "0.6.0"
chrono = "0.4.22"
lazy_static = "1.4.0"
rand = { version = "0.8.5", features = ["std_rng"] }
lru_time_cache = "0.11.11"
anyhow = "1.0.66"
63 changes: 63 additions & 0 deletions packages/serverless/src/deployments/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};

use log::info;
use tokio::sync::RwLock;

use crate::ISOLATES;

const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(60);

pub fn run_cache_clear_task(last_requests: Arc<RwLock<HashMap<String, Instant>>>) {
let isolates_cache_seconds = Duration::from_secs(
dotenv::var("LAGON_ISOLATES_CACHE_SECONDS")
.expect("LAGON_ISOLATES_CACHE_SECONDS must be set")
.parse()
.expect("Failed to parse LAGON_ISOLATES_CACHE_SECONDS"),
);

tokio::spawn(async move {
loop {
tokio::time::sleep(CACHE_TASK_INTERVAL).await;

let now = Instant::now();
let mut isolates_to_clear = Vec::new();
let last_requests_reader = last_requests.read().await;

info!("Running cache clear task");

for (hostname, last_request) in last_requests_reader.iter() {
if now.duration_since(*last_request) > isolates_cache_seconds {
isolates_to_clear.push(hostname.clone());
}
}

if isolates_to_clear.is_empty() {
continue;
}

// Drop the read lock because we now acquire a write lock
drop(last_requests_reader);

let mut thread_isolates = ISOLATES.write().await;
let mut last_requests = last_requests.write().await;

for hostname in isolates_to_clear {
for isolates in thread_isolates.values_mut() {
last_requests.remove(&hostname);

if let Some(isolate) = isolates.remove(&hostname) {
let (deployment, ..) = isolate
.get_metadata()
.unwrap_or_else(|| ("Unknown".to_owned(), "".to_owned()));

info!(deployment = deployment; "Clearing deployment from cache due to expiration");
}
}
}
}
});
}
1 change: 1 addition & 0 deletions packages/serverless/src/deployments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use self::filesystem::{
};

pub mod assets;
pub mod cache;
pub mod filesystem;
pub mod pubsub;

Expand Down
22 changes: 10 additions & 12 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use deployments::cache::run_cache_clear_task;
use deployments::Deployment;
use hyper::body::Bytes;
use hyper::header::HOST;
Expand All @@ -11,7 +12,6 @@ use lagon_runtime::isolate::{Isolate, IsolateOptions};
use lagon_runtime::runtime::{Runtime, RuntimeOptions};
use lazy_static::lazy_static;
use log::{as_debug, error, info, warn};
use lru_time_cache::LruCache;
use metrics::{counter, histogram, increment_counter};
use metrics_exporter_prometheus::PrometheusBuilder;
use mysql::{Opts, Pool};
Expand All @@ -24,7 +24,7 @@ use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio_util::task::LocalPoolHandle;

Expand All @@ -37,17 +37,11 @@ use crate::logger::init_logger;
mod deployments;
mod logger;

type ThreadIsolates = LruCache<String, Isolate<(String, String)>>;
type ThreadIsolates = HashMap<String, Isolate<(String, String)>>;

lazy_static! {
pub static ref ISOLATES: RwLock<HashMap<usize, ThreadIsolates>> = RwLock::new(HashMap::new());
static ref X_FORWARDED_FOR: String = String::from("X-Forwarded-For");
static ref ISOLATES_CACHE_SECONDS: Duration = Duration::from_secs(
dotenv::var("LAGON_ISOLATES_CACHE_SECONDS")
.expect("LAGON_ISOLATES_CACHE_SECONDS must be set")
.parse()
.expect("Failed to parse LAGON_ISOLATES_CACHE_SECONDS")
);
}

const POOL_SIZE: usize = 8;
Expand All @@ -61,6 +55,7 @@ async fn handle_request(
pool: LocalPoolHandle,
deployments: Arc<RwLock<HashMap<String, Arc<Deployment>>>>,
thread_ids: Arc<RwLock<HashMap<String, usize>>>,
last_requests: Arc<RwLock<HashMap<String, Instant>>>,
) -> Result<HyperResponse<Body>> {
let url = req.uri().path();
// Remove the leading '/' from the url
Expand Down Expand Up @@ -126,6 +121,7 @@ async fn handle_request(

tx.send_async(run_result).await.unwrap_or(());
} else {
last_requests.write().await.insert(hostname.clone(), Instant::now());
increment_counter!("lagon_requests", &labels);

let mut request = match Request::from_hyper(req).await {
Expand All @@ -150,9 +146,7 @@ async fn handle_request(
// deployment and that the isolate should be called.
// TODO: read() then write() if not present
let mut isolates = ISOLATES.write().await;
let thread_isolates = isolates.entry(thread_id).or_insert_with(|| {
LruCache::with_expiry_duration(*ISOLATES_CACHE_SECONDS)
});
let thread_isolates = isolates.entry(thread_id).or_insert_with(HashMap::new);

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
info!(deployment = deployment.id; "Creating new isolate");
Expand Down Expand Up @@ -301,6 +295,8 @@ async fn main() -> Result<()> {

let deployments = get_deployments(conn, bucket.clone()).await?;
let redis = listen_pub_sub(bucket.clone(), Arc::clone(&deployments));
let last_requests = Arc::new(RwLock::new(HashMap::new()));
run_cache_clear_task(Arc::clone(&last_requests));

let pool = LocalPoolHandle::new(POOL_SIZE);
let thread_ids = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -309,6 +305,7 @@ async fn main() -> Result<()> {
let deployments = Arc::clone(&deployments);
let pool = pool.clone();
let thread_ids = Arc::clone(&thread_ids);
let last_requests = Arc::clone(&last_requests);

let addr = conn.remote_addr();
let ip = addr.ip().to_string();
Expand All @@ -321,6 +318,7 @@ async fn main() -> Result<()> {
pool.clone(),
Arc::clone(&deployments),
Arc::clone(&thread_ids),
Arc::clone(&last_requests),
)
}))
}
Expand Down

0 comments on commit e970b9d

Please sign in to comment.