diff --git a/.changeset/dull-geckos-protect.md b/.changeset/dull-geckos-protect.md new file mode 100644 index 000000000..55af76815 --- /dev/null +++ b/.changeset/dull-geckos-protect.md @@ -0,0 +1,5 @@ +--- +'@lagon/js-runtime': patch +--- + +Simplify streaming responses diff --git a/.changeset/wicked-islands-exercise.md b/.changeset/wicked-islands-exercise.md new file mode 100644 index 000000000..b02540f4e --- /dev/null +++ b/.changeset/wicked-islands-exercise.md @@ -0,0 +1,6 @@ +--- +'@lagon/cli': patch +'@lagon/serverless': patch +--- + +Allow ending a stream before sending the response diff --git a/crates/cli/src/commands/dev.rs b/crates/cli/src/commands/dev.rs index c973c2352..caca545a4 100644 --- a/crates/cli/src/commands/dev.rs +++ b/crates/cli/src/commands/dev.rs @@ -173,23 +173,45 @@ async fn handle_request( let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); } - StreamResult::Done => panic!("Got a stream done without data"), + StreamResult::Done => { + println!( + "{}", + error("The stream was done before sending a response/data") + ); + + // Close the stream by sending empty bytes + stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); + } } tokio::spawn(async move { + let mut done = false; + while let Ok(result) = rx.recv_async().await { match result { RunResult::Stream(StreamResult::Start(response)) => { response_tx.send_async(response).await.unwrap_or(()); } RunResult::Stream(StreamResult::Data(bytes)) => { + if done { + println!("{}", error("Got data after stream was done")); + + // Close the stream by sending empty bytes + stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); + break; + } + let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); } _ => { - println!("{} {:?}", error("Unexpected stream result:"), result); - // Close the stream by sending empty bytes if we receive anything - // else than StreamResult (e.g errors) + done = result == RunResult::Stream(StreamResult::Done); + + if !done { + println!("{} {:?}", error("Unexpected stream result:"), result); + } + + // Close the stream by sending empty bytes stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); } } diff --git a/crates/serverless/src/main.rs b/crates/serverless/src/main.rs index a4559fac6..3971b0769 100644 --- a/crates/serverless/src/main.rs +++ b/crates/serverless/src/main.rs @@ -268,25 +268,54 @@ async fn handle_request( let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); } - StreamResult::Done => panic!("Got a stream done without data"), + StreamResult::Done => { + handle_error( + RunResult::Error( + "The stream was done before sending a response/data".into(), + ), + deployment_id, + &labels, + ); + + // Close the stream by sending empty bytes + stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); + } } let deployment_id = deployment_id.clone(); tokio::spawn(async move { + let mut done = false; + while let Ok(result) = rx.recv_async().await { match result { RunResult::Stream(StreamResult::Start(response)) => { response_tx.send_async(response).await.unwrap_or(()); } RunResult::Stream(StreamResult::Data(bytes)) => { + if done { + handle_error( + RunResult::Error("Got data after stream was done".into()), + &deployment_id, + &labels, + ); + + // Close the stream by sending empty bytes + stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); + break; + } + let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); } _ => { - handle_error(result, &deployment_id, &labels); - // Close the stream by sending empty bytes if we receive anything - // else than StreamResult (e.g errors) + done = result == RunResult::Stream(StreamResult::Done); + + if !done { + handle_error(result, &deployment_id, &labels); + } + + // Close the stream by sending empty bytes stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(()); } } diff --git a/packages/js-runtime/src/index.ts b/packages/js-runtime/src/index.ts index c45e46fc2..db556e771 100644 --- a/packages/js-runtime/src/index.ts +++ b/packages/js-runtime/src/index.ts @@ -106,23 +106,22 @@ export async function masterHandler(request: { if (response.body && response.isStream) { const reader = response.body.getReader(); - new ReadableStream({ - start: controller => { - const push = () => { - reader.read().then(({ done, value }) => { - if (done) { - controller.close(); - Lagon.pullStream(done); - return; - } - controller.enqueue(value); - Lagon.pullStream(done, value); - push(); - }); - }; - push(); - }, - }); + const read = () => { + reader.read().then(({ done, value }) => { + if (done) { + Lagon.pullStream(done); + return; + } + + if (value.byteLength !== 0) { + Lagon.pullStream(done, value); + } + + read(); + }); + }; + + read(); } else { // @ts-expect-error we reassign body even if it's readonly response.body = await response.text();