Skip to content

Commit

Permalink
fix(runtime,serverless,cli): properly handle errors and timeouts whil…
Browse files Browse the repository at this point in the history
…e streaming (#416)

* fix(runtime,serverless,cli): properly handle errors and timeouts while streaming

* fix(wpt): skip tests throwing errors
  • Loading branch information
QuiiBz authored Dec 26, 2022
1 parent b0cfd82 commit c3bbdb3
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 56 deletions.
6 changes: 6 additions & 0 deletions .changeset/heavy-kings-float.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@lagon/cli': patch
'@lagon/serverless': patch
---

Stop streaming and log errors when we have errors/timeouts/memory limits
5 changes: 5 additions & 0 deletions .changeset/hip-mails-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/runtime': patch
---

Add promise_reject_callback to always throw errors
5 changes: 5 additions & 0 deletions .changeset/rude-items-brush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Handle termination results (timeouts and memory limit) before processing streaming to avoid hanging
20 changes: 13 additions & 7 deletions packages/cli/src/commands/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use tokio::sync::Mutex;
use tokio_util::task::LocalPoolHandle;

use crate::utils::{
bundle_function, info, input, success, validate_code_file, validate_public_dir, warn, Assets,
bundle_function, error, info, input, success, validate_code_file, validate_public_dir, warn,
Assets,
};

use log::{
Expand Down Expand Up @@ -72,7 +73,7 @@ fn parse_environment_variables(env: Option<PathBuf>) -> Result<HashMap<String, S

// This function is similar to packages/serverless/src/main.rs,
// expect that we don't have multiple deployments and such multiple
// threads to manage.
// threads to manage, and we don't manager logs and metrics.
async fn handle_request(
req: HyperRequest<Body>,
ip: String,
Expand Down Expand Up @@ -177,16 +178,21 @@ async fn handle_request(
}

tokio::spawn(async move {
while let Ok(RunResult::Stream(stream_result)) = rx.recv_async().await {
match stream_result {
StreamResult::Start(response) => {
while let Ok(result) = rx.recv_async().await {
match result {
RunResult::Stream(StreamResult::Start(response)) => {
response_tx.send_async(response).await.unwrap_or(());
}
StreamResult::Data(bytes) => {
RunResult::Stream(StreamResult::Data(bytes)) => {
let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
_ => {}
_ => {
println!("{} {:?}", error("Unexpected stream result:"), result);
// Close the stream by sending empty bytes if we receive anything
// else than StreamResult (e.g errors)
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}
}
});
Expand Down
59 changes: 44 additions & 15 deletions packages/runtime/src/isolate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use std::{
};

use futures::{future::poll_fn, stream::FuturesUnordered, Future, StreamExt};
use v8::PromiseState;

use crate::{
http::{FromV8, IntoV8, Request, Response, RunResult, StreamResult},
runtime::{get_runtime_code, POOL},
utils::{v8_boolean, v8_string, v8_uint8array},
utils::{extract_v8_string, v8_boolean, v8_string, v8_uint8array},
};

use self::bindings::{BindingResult, PromiseResult};
Expand All @@ -39,6 +38,23 @@ extern "C" fn heap_limit_callback(
current_heap_limit * 2
}

extern "C" fn promise_reject_callback(message: v8::PromiseRejectMessage) {
if message.get_event() == v8::PromiseRejectEvent::PromiseRejectWithNoHandler {
let scope = &mut unsafe { v8::CallbackScope::new(&message) };
let message = message.get_value().map_or_else(
|| "Unknown promise rejected error".to_owned(),
|value| {
extract_v8_string(value, scope)
.unwrap_or_else(|_| "Failed to extract promise reject message".to_owned())
},
);

let isolate = Isolate::state(scope);
let mut state = isolate.borrow_mut();
state.promise_rejected_message = Some(message);
}
}

// We don't allow imports at all, so we return None and throw an error
// so it can be catched later. As the error message suggests, all code
// should be bundled into a single file.
Expand Down Expand Up @@ -72,6 +88,7 @@ struct IsolateState {
handler_result: Option<v8::Global<v8::Promise>>,
stream_sender: flume::Sender<StreamResult>,
metadata: Rc<Metadata>,
promise_rejected_message: Option<String>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -198,6 +215,9 @@ impl Isolate {
// }

let mut isolate = v8::Isolate::new(params);
isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 4);
isolate.set_promise_reject_callback(promise_reject_callback);

let (stream_sender, stream_receiver) = flume::unbounded();

let state: IsolateState = {
Expand All @@ -211,6 +231,7 @@ impl Isolate {
handler_result: None,
stream_sender,
metadata: Rc::clone(&options.metadata),
promise_rejected_message: None,
}
};

Expand Down Expand Up @@ -374,9 +395,6 @@ impl Isolate {

if !isolate_state.promises.is_empty() {
promises = Some(Vec::new());
}

if !isolate_state.promises.is_empty() {
self.running_promises.store(true, Ordering::SeqCst);

while let Poll::Ready(Some(BindingResult { id, result })) =
Expand Down Expand Up @@ -447,21 +465,31 @@ impl Isolate {
self.resolve_promises(cx);
self.poll_stream(tx);

// Handle termination results like timeouts and memory limit before
// checking the streaming status and promise state.
if let Ok(run_result) = self.termination_rx.as_ref().unwrap().try_recv() {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}

let isolate_state = Isolate::state(&self.isolate);
let state = isolate_state.borrow();

if let Some(promise_rejected_message) = &state.promise_rejected_message {
tx.send(RunResult::Error(promise_rejected_message.to_string()))
.unwrap_or(());
return Poll::Ready(());
}

if self.stream_response_sent {
if self.stream_status.is_done() {
return Poll::Ready(());
}

cx.waker().wake_by_ref();
return Poll::Pending;
}

if let Ok(run_result) = self.termination_rx.as_ref().unwrap().try_recv() {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}

let isolate_state = Isolate::state(&self.isolate);
let state = isolate_state.borrow();
let global = state.global.0.clone();
let scope = &mut v8::HandleScope::with_context(&mut self.isolate, global);
let try_catch = &mut v8::TryCatch::new(scope);
Expand All @@ -470,7 +498,7 @@ impl Isolate {
let promise = promise.open(try_catch);

match promise.state() {
PromiseState::Fulfilled => {
v8::PromiseState::Fulfilled => {
let response = promise.result(try_catch);

let run_result = match Response::from_v8(try_catch, response) {
Expand All @@ -490,6 +518,7 @@ impl Isolate {
return if self.stream_status.is_done() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
};
}
Expand All @@ -498,7 +527,7 @@ impl Isolate {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}
PromiseState::Rejected => {
v8::PromiseState::Rejected => {
let exception = promise.result(try_catch);

tx.send(RunResult::Error(get_exception_message(
Expand All @@ -507,7 +536,7 @@ impl Isolate {
.unwrap_or(());
return Poll::Ready(());
}
PromiseState::Pending => {}
v8::PromiseState::Pending => {}
};
}

Expand Down
8 changes: 6 additions & 2 deletions packages/runtime/tests/disallow_codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ async fn disallow_eval() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught EvalError: Code generation from strings disallowed for this context, at:\n const result = eval('1 + 1')".into())
RunResult::Error(
"EvalError: Code generation from strings disallowed for this context".into()
)
);
}

Expand All @@ -48,6 +50,8 @@ async fn disallow_function() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught EvalError: Code generation from strings disallowed for this context, at:\n const result = new Function('return 1 + 1')".into())
RunResult::Error(
"EvalError: Code generation from strings disallowed for this context".into()
)
);
}
6 changes: 3 additions & 3 deletions packages/runtime/tests/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn no_handler() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Handler function is not defined or is not a function, at:\n throw new Error(\"Handler function is not defined or is not a function\");".into())
RunResult::Error("Error: Handler function is not defined or is not a function".into())
);
}

Expand All @@ -37,7 +37,7 @@ async fn handler_not_function() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Handler function is not defined or is not a function, at:\n throw new Error(\"Handler function is not defined or is not a function\");".into())
RunResult::Error("Error: Handler function is not defined or is not a function".into())
);
}

Expand All @@ -55,7 +55,7 @@ async fn handler_reject() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Rejected, at:\n throw new Error('Rejected');".into())
RunResult::Error("Error: Rejected".into())
);
}

Expand Down
10 changes: 2 additions & 8 deletions packages/runtime/tests/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,7 @@ async fn throw_invalid_url() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error(
"Uncaught Error: client requires absolute-form URIs, at:\n throw new Error(error);"
.into()
)
RunResult::Error("Error: client requires absolute-form URIs".into())
);
}

Expand All @@ -354,10 +351,7 @@ async fn throw_invalid_header() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error(
"Uncaught Error: failed to parse header value, at:\n throw new Error(error);"
.into()
)
RunResult::Error("Error: failed to parse header value".into())
);
}

Expand Down
94 changes: 94 additions & 0 deletions packages/runtime/tests/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,97 @@ async fn response_before_write() {
);
assert!(rx.recv_async().await.is_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn timeout_infinite_streaming() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const { readable } = new TransformStream()
return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Stream(StreamResult::Start(Response::from(
"[object ReadableStream]"
)))
);
assert_eq!(rx.recv_async().await.unwrap(), RunResult::Timeout);
}

#[tokio::test(flavor = "multi_thread")]
async fn promise_reject_callback() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const { readable } = new TransformStream()
async function trigger() {
doesNotExists();
}
trigger();
return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("ReferenceError: doesNotExists is not defined".to_owned())
);
assert!(rx.recv_async().await.is_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn promise_reject_callback_after_response() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const output = new TextEncoder().encode('This is rendered as binary stream with non-ASCII chars 😊');
const { readable, writable } = new TransformStream();
async function stream() {
// Just to delay a bit
await fetch('https://google.com');
const writer = writable.getWriter();
for (let i = 0; i < output.length; i++) {
await new Promise(resolve => {
doesNotExists(resolve, 0);
});
writer.write(new Uint8Array([output[i]]));
}
}
stream();
return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Stream(StreamResult::Start(Response::from(
"[object ReadableStream]"
)))
);
assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("ReferenceError: doesNotExists is not defined".to_owned())
);
assert!(rx.recv_async().await.is_err());
}
Loading

0 comments on commit c3bbdb3

Please sign in to comment.