Skip to content

Commit

Permalink
feat(serverless): add logs for response types (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiiBz authored Nov 17, 2022
1 parent 19718f1 commit 2a185ef
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 95 deletions.
5 changes: 5 additions & 0 deletions .changeset/clever-dots-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Add logs for response types
206 changes: 111 additions & 95 deletions packages/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ async fn handle_request(
}
};

let deployments = deployments.read().await;
let deployment = match deployments.get(&hostname) {
Some(deployment) => deployment.clone(),
None => {
warn!(request = as_debug!(req); "No deployment found for hostname");

return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?);
}
};

let deployment_id = deployment.id.clone();
let thread_ids_reader = thread_ids.read().await;

let thread_id = match thread_ids_reader.get(&hostname) {
Expand All @@ -95,101 +106,92 @@ async fn handle_request(
pool.spawn_pinned_by_idx(
move || {
async move {
let deployments = deployments.read().await;

match deployments.get(&hostname) {
Some(deployment) => {
let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
];

if let Some(asset) = deployment.assets.iter().find(|asset| {
asset.replace(".html", "") == url || asset.replace("/index.html", "") == url
}) {
let run_result = match handle_asset(deployment, asset) {
Ok(response) => RunResult::Response(response),
Err(error) => {
error!(deployment = &deployment.id, asset = asset; "Error while handing asset: {}", error);

RunResult::Error("Could not retrieve asset.".into())
}
};

tx.send_async(run_result).await.unwrap_or(());
} else {
increment_counter!("lagon_requests", &labels);

let mut request = match Request::from_hyper(req).await {
Ok(request) => request,
Err(error) => {
error!(deployment = &deployment.id; "Error while parsing request: {}", error);

tx.send_async(RunResult::Error(
"Error while parsing request".into(),
))
.await
.unwrap_or(());

return;
}
};

counter!("lagon_bytes_in", request.len() as u64, &labels);
request.add_header(X_FORWARDED_FOR.to_string(), ip);

// Only acquire the lock when we are sure we have a
// deployment and that the isolate should be called.
// TODO: read() then write() if not present
let mut isolates = ISOLATES.write().await;
let thread_isolates = isolates.entry(thread_id).or_insert_with(|| {
LruCache::with_expiry_duration(*ISOLATES_CACHE_SECONDS)
});

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
info!(deployment = deployment.id; "Creating new isolate");

// TODO: handle read error
let code = get_deployment_code(deployment).unwrap_or_else(|error| {
error!(deployment = deployment.id; "Error while getting deployment code: {}", error);

"".into()
});
let options = IsolateOptions::new(code)
.with_environment_variables(
deployment.environment_variables.clone(),
)
.with_memory(deployment.memory)
.with_timeout(deployment.timeout)
.with_metadata((deployment.id.clone(), deployment.function_id.clone()))
.with_on_drop_callback(Box::new(|metadata| {
info!(deployment = metadata.unwrap().0; "Dropping isolate");
}))
.with_on_statistics_callback(Box::new(|metadata, statistics| {
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0),
("function", metadata.1),
];

histogram!("lagon_isolate_cpu_time", statistics.cpu_time, &labels);
histogram!(
"lagon_isolate_memory_usage",
statistics.memory_usage as f64,
&labels
);
}));

Isolate::new(options)
});

isolate.run(request, tx.clone()).await;
let labels = [
("deployment", deployment.id.clone()),
("function", deployment.function_id.clone()),
];

if let Some(asset) = deployment.assets.iter().find(|asset| {
asset.replace(".html", "") == url || asset.replace("/index.html", "") == url
}) {
let run_result = match handle_asset(&deployment, asset) {
Ok(response) => RunResult::Response(response),
Err(error) => {
error!(deployment = &deployment.id, asset = asset; "Error while handing asset: {}", error);

RunResult::Error("Could not retrieve asset.".into())
}
}
None => {
tx.send_async(RunResult::NotFound).await.unwrap_or(());
}
};

tx.send_async(run_result).await.unwrap_or(());
} else {
increment_counter!("lagon_requests", &labels);

let mut request = match Request::from_hyper(req).await {
Ok(request) => request,
Err(error) => {
error!(deployment = &deployment.id; "Error while parsing request: {}", error);

tx.send_async(RunResult::Error(
"Error while parsing request".into(),
))
.await
.unwrap_or(());

return;
}
};

counter!("lagon_bytes_in", request.len() as u64, &labels);
request.add_header(X_FORWARDED_FOR.to_string(), ip);

// Only acquire the lock when we are sure we have a
// deployment and that the isolate should be called.
// TODO: read() then write() if not present
let mut isolates = ISOLATES.write().await;
let thread_isolates = isolates.entry(thread_id).or_insert_with(|| {
LruCache::with_expiry_duration(*ISOLATES_CACHE_SECONDS)
});

let isolate = thread_isolates.entry(hostname).or_insert_with(|| {
info!(deployment = deployment.id; "Creating new isolate");

// TODO: handle read error
let code = get_deployment_code(&deployment).unwrap_or_else(|error| {
error!(deployment = deployment.id; "Error while getting deployment code: {}", error);

"".into()
});
let options = IsolateOptions::new(code)
.with_environment_variables(
deployment.environment_variables.clone(),
)
.with_memory(deployment.memory)
.with_timeout(deployment.timeout)
.with_metadata((deployment.id.clone(), deployment.function_id.clone()))
.with_on_drop_callback(Box::new(|metadata| {
info!(deployment = metadata.unwrap().0; "Dropping isolate");
}))
.with_on_statistics_callback(Box::new(|metadata, statistics| {
let metadata = metadata.unwrap();

let labels = [
("deployment", metadata.0),
("function", metadata.1),
];

histogram!("lagon_isolate_cpu_time", statistics.cpu_time, &labels);
histogram!(
"lagon_isolate_memory_usage",
statistics.memory_usage as f64,
&labels
);
}));

Isolate::new(options)
});

isolate.run(request, tx.clone()).await;
}
}
},
Expand Down Expand Up @@ -242,9 +244,23 @@ async fn handle_request(
Ok(hyper_response)
}
RunResult::Timeout | RunResult::MemoryLimit => {
match result {
RunResult::Timeout => {
warn!(deployment = deployment_id; "Function execution timed out")
}
RunResult::MemoryLimit => {
warn!(deployment = deployment_id; "Function execution memory limit reached")
}
_ => {}
};

Ok(HyperResponse::builder().status(502).body(PAGE_502.into())?)
}
RunResult::Error(_) => Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?),
RunResult::Error(error) => {
error!(deployment = deployment_id; "Function execution error: {}", error);

Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?)
}
RunResult::NotFound => Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?),
}
}
Expand Down

0 comments on commit 2a185ef

Please sign in to comment.