Skip to content

Commit

Permalink
perf(serverless,cli): reduce heap allocations on responses (#818)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiiBz authored May 1, 2023
1 parent 29fb49d commit cfa7238
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 127 deletions.
6 changes: 6 additions & 0 deletions .changeset/rich-cobras-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@lagon/cli': patch
'@lagon/serverless': patch
---

Reduce head allocations on responses
54 changes: 24 additions & 30 deletions crates/cli/src/commands/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,38 +117,32 @@ async fn handle_request(
};
}

handle_response(
rx,
(),
Box::new(|event, _| {
Box::pin(async move {
match event {
ResponseEvent::StreamDoneNoDataError => {
println!(
"{}",
error("The stream was done before sending a response/data")
);
}
ResponseEvent::UnexpectedStreamResult(result) => {
println!("{} {:?}", error("Unexpected stream result:"), result);
}
ResponseEvent::LimitsReached(result) => {
if result == RunResult::Timeout {
println!("{}", error("Function execution timed out"));
} else {
println!("{}", error("Function execution reached memory limit"));
}
}
ResponseEvent::Error(result) => {
println!("{}", error(result.as_error().as_str()));
}
_ => {}
handle_response(rx, |event| async move {
match event {
ResponseEvent::StreamDoneNoDataError => {
println!(
"{}",
error("The stream was done before sending a response/data")
);
}
ResponseEvent::UnexpectedStreamResult(result) => {
println!("{} {:?}", error("Unexpected stream result:"), result);
}
ResponseEvent::LimitsReached(result) => {
if result == RunResult::Timeout {
println!("{}", error("Function execution timed out"));
} else {
println!("{}", error("Function execution reached memory limit"));
}
}
ResponseEvent::Error(result) => {
println!("{}", error(result.as_error().as_str()));
}
_ => {}
}

Ok(())
})
}),
)
Ok(())
})
.await
}

Expand Down
1 change: 0 additions & 1 deletion crates/runtime_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub enum RunResult {
Timeout,
MemoryLimit,
Error(String),
NotFound,
}

impl RunResult {
Expand Down
57 changes: 20 additions & 37 deletions crates/runtime_utils/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use flume::Receiver;
use hyper::{body::Bytes, http::response::Builder, Body, Response as HyperResponse};
use lagon_runtime_http::{RunResult, StreamResult};
use std::{future::Future, pin::Pin};
use std::future::Future;

pub const PAGE_404: &str = include_str!("../public/404.html");
pub const PAGE_403: &str = include_str!("../public/403.html");
Expand All @@ -19,16 +19,12 @@ pub enum ResponseEvent {
Error(RunResult),
}

type OnEventReturnType = Pin<Box<(dyn Future<Output = Result<()>> + Send + Sync)>>;
type OnEvent<D> = Box<dyn Fn(ResponseEvent, D) -> OnEventReturnType + Send + Sync>;

pub async fn handle_response<D>(
pub async fn handle_response<F>(
rx: Receiver<RunResult>,
data: D,
on_event: OnEvent<D>,
on_event: impl Fn(ResponseEvent) -> F + Send + Sync + 'static,
) -> Result<HyperResponse<Body>>
where
D: Send + Sync + Clone + 'static,
F: Future<Output = Result<()>> + Send,
{
let result = rx.recv_async().await?;

Expand All @@ -51,7 +47,7 @@ where
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
StreamResult::Done(_) => {
on_event(ResponseEvent::StreamDoneNoDataError, data.clone()).await?;
on_event(ResponseEvent::StreamDoneNoDataError).await?;

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
Expand All @@ -71,20 +67,17 @@ where
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
RunResult::Stream(StreamResult::Done(elapsed)) => {
on_event(
ResponseEvent::Bytes(total_bytes, Some(elapsed.as_micros())),
data.clone(),
)
.await
.expect("Failed to send event");
on_event(ResponseEvent::Bytes(total_bytes, Some(elapsed.as_micros())))
.await
.unwrap_or(());

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
_ => {
on_event(ResponseEvent::UnexpectedStreamResult(result), data.clone())
on_event(ResponseEvent::UnexpectedStreamResult(result))
.await
.expect("Failed to send event");
.unwrap_or(());

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
Expand All @@ -100,25 +93,24 @@ where
Ok(hyper_response)
}
RunResult::Response(response, elapsed) => {
on_event(
ResponseEvent::Bytes(response.len(), elapsed.map(|duration| duration.as_micros())),
data,
)
.await?;
let event =
ResponseEvent::Bytes(response.len(), elapsed.map(|duration| duration.as_micros()));
on_event(event).await?;

Ok(Builder::try_from(&response)?.body(response.body.into())?)
}
RunResult::Timeout | RunResult::MemoryLimit => {
on_event(ResponseEvent::LimitsReached(result), data).await?;
let event = ResponseEvent::LimitsReached(result);
on_event(event).await?;

Ok(HyperResponse::builder().status(502).body(PAGE_502.into())?)
}
RunResult::Error(_) => {
on_event(ResponseEvent::Error(result), data).await?;
let event = ResponseEvent::Error(result);
on_event(event).await?;

Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?)
}
RunResult::NotFound => Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?),
}
}

Expand All @@ -136,10 +128,7 @@ mod tests {
let (tx, rx) = flume::unbounded::<RunResult>();

let handle = tokio::spawn(async move {
let mut response =
handle_response(rx, (), Box::new(|_, _| Box::pin(async move { Ok(()) })))
.await
.unwrap();
let mut response = handle_response(rx, |_| async { Ok(()) }).await.unwrap();

assert_eq!(response.status(), 200);
assert_eq!(
Expand All @@ -160,10 +149,7 @@ mod tests {
let (tx, rx) = flume::unbounded::<RunResult>();

let handle = tokio::spawn(async move {
let mut response =
handle_response(rx, (), Box::new(|_, _| Box::pin(async move { Ok(()) })))
.await
.unwrap();
let mut response = handle_response(rx, |_| async { Ok(()) }).await.unwrap();

assert_eq!(response.status(), 200);
assert_eq!(
Expand Down Expand Up @@ -200,10 +186,7 @@ mod tests {
let (tx, rx) = flume::unbounded::<RunResult>();

let handle = tokio::spawn(async move {
let mut response =
handle_response(rx, (), Box::new(|_, _| Box::pin(async move { Ok(()) })))
.await
.unwrap();
let mut response = handle_response(rx, |_| async { Ok(()) }).await.unwrap();

assert_eq!(response.status(), 200);
assert_eq!(
Expand Down
111 changes: 52 additions & 59 deletions crates/serverless/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ async fn handle_request(

let function_id = deployment.function_id.clone();
let deployment_id = deployment.id.clone();
let mut bytes_in = 0;

let request_id_handle = request_id.clone();

let (sender, receiver) = flume::unbounded();
let mut bytes_in = 0;

let labels = [
("deployment", deployment.id.clone()),
Expand All @@ -145,7 +143,7 @@ async fn handle_request(
let run_result = match handle_asset(root, asset) {
Ok(response) => RunResult::Response(response, None),
Err(error) => {
error!(deployment = &deployment.id, asset = asset, request = request_id; "Error while handing asset: {}", error);
error!(deployment = &deployment.id, asset = asset, request = request_id_handle; "Error while handing asset: {}", error);

RunResult::Error("Could not retrieve asset.".into())
}
Expand Down Expand Up @@ -187,10 +185,10 @@ async fn handle_request(
std::thread::Builder::new().name(String::from("isolate-") + deployment.id.as_str()).spawn(move || {
handle.block_on(async move {
increment_gauge!("lagon_isolates", 1.0, &labels);
info!(deployment = deployment.id, function = deployment.function_id, request = request_id; "Creating new isolate");
info!(deployment = deployment.id, function = deployment.function_id, request = request_id_handle; "Creating new isolate");

let code = deployment.get_code().unwrap_or_else(|error| {
error!(deployment = deployment.id, request = request_id; "Error while getting deployment code: {}", error);
error!(deployment = deployment.id, request = request_id_handle; "Error while getting deployment code: {}", error);

"".into()
});
Expand Down Expand Up @@ -255,7 +253,7 @@ async fn handle_request(
.unwrap_or(());
}
Err(error) => {
error!(deployment = &deployment.id, request = request_id; "Error while parsing request: {}", error);
error!(deployment = &deployment.id, request = request_id_handle; "Error while parsing request: {}", error);

sender
.send_async(RunResult::Error("Error while parsing request".into()))
Expand All @@ -265,58 +263,53 @@ async fn handle_request(
}
}

handle_response(
receiver,
(
function_id,
deployment_id,
bytes_in,
request_id_handle,
labels,
inserters,
),
Box::new(
|event, (function_id, deployment_id, bytes_in, request_id, labels, inserters)| {
Box::pin(async move {
match event {
ResponseEvent::Bytes(bytes, cpu_time_micros) => {
inserters
.lock()
.await
.0
.write(&RequestRow {
function_id,
deployment_id,
region: REGION.clone(),
bytes_in,
bytes_out: bytes as u32,
cpu_time_micros,
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs() as u32,
})
.await?;
}
ResponseEvent::StreamDoneNoDataError => {
handle_error(
RunResult::Error(
"The stream was done before sending a response/data".into(),
),
&deployment_id,
&request_id,
&labels,
);
}
ResponseEvent::UnexpectedStreamResult(result)
| ResponseEvent::LimitsReached(result)
| ResponseEvent::Error(result) => {
handle_error(result, &deployment_id, &request_id, &labels);
}
}

Ok(())
})
},
),
)
handle_response(receiver, move |event| {
let inserters = Arc::clone(&inserters);
let function_id = function_id.clone();
let deployment_id = deployment_id.clone();
let request_id = request_id.clone();
let labels = labels.clone();

async move {
match event {
ResponseEvent::Bytes(bytes, cpu_time_micros) => {
inserters
.lock()
.await
.0
.write(&RequestRow {
function_id,
deployment_id,
region: REGION.clone(),
bytes_in,
bytes_out: bytes as u32,
cpu_time_micros,
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs() as u32,
})
.await
.unwrap_or(());
}
ResponseEvent::StreamDoneNoDataError => {
handle_error(
RunResult::Error(
"The stream was done before sending a response/data".into(),
),
&deployment_id,
&request_id,
&labels,
);
}
ResponseEvent::UnexpectedStreamResult(result) => {
handle_error(result, &deployment_id, &request_id, &labels);
}
ResponseEvent::LimitsReached(result) | ResponseEvent::Error(result) => {
handle_error(result, &deployment_id, &request_id, &labels);
}
}

Ok(())
}
})
.await
}

Expand Down

2 comments on commit cfa7238

@vercel
Copy link

@vercel vercel bot commented on cfa7238 May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./packages/docs

docs-git-main-lagon.vercel.app
docs-lagon.vercel.app
lagon-docs.vercel.app
docs.lagon.app

@vercel
Copy link

@vercel vercel bot commented on cfa7238 May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

storybook – ./packages/ui

storybook-swart-eight.vercel.app
storybook-lagon.vercel.app
storybook-git-main-lagon.vercel.app
ui.lagon.app

Please sign in to comment.