Skip to content

Commit

Permalink
feat(serverless,cli): allow ending a stream before sending the respon…
Browse files Browse the repository at this point in the history
…se (#458)
  • Loading branch information
QuiiBz authored Jan 6, 2023
1 parent 9c00ad1 commit 083f639
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-geckos-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@lagon/js-runtime': patch
---

Simplify streaming responses
6 changes: 6 additions & 0 deletions .changeset/wicked-islands-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@lagon/cli': patch
'@lagon/serverless': patch
---

Allow ending a stream before sending the response
30 changes: 26 additions & 4 deletions crates/cli/src/commands/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,45 @@ async fn handle_request(
let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
StreamResult::Done => panic!("Got a stream done without data"),
StreamResult::Done => {
println!(
"{}",
error("The stream was done before sending a response/data")
);

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}

tokio::spawn(async move {
let mut done = false;

while let Ok(result) = rx.recv_async().await {
match result {
RunResult::Stream(StreamResult::Start(response)) => {
response_tx.send_async(response).await.unwrap_or(());
}
RunResult::Stream(StreamResult::Data(bytes)) => {
if done {
println!("{}", error("Got data after stream was done"));

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
break;
}

let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
_ => {
println!("{} {:?}", error("Unexpected stream result:"), result);
// Close the stream by sending empty bytes if we receive anything
// else than StreamResult (e.g errors)
done = result == RunResult::Stream(StreamResult::Done);

if !done {
println!("{} {:?}", error("Unexpected stream result:"), result);
}

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}
Expand Down
37 changes: 33 additions & 4 deletions crates/serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,54 @@ async fn handle_request(
let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
StreamResult::Done => panic!("Got a stream done without data"),
StreamResult::Done => {
handle_error(
RunResult::Error(
"The stream was done before sending a response/data".into(),
),
deployment_id,
&labels,
);

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}

let deployment_id = deployment_id.clone();

tokio::spawn(async move {
let mut done = false;

while let Ok(result) = rx.recv_async().await {
match result {
RunResult::Stream(StreamResult::Start(response)) => {
response_tx.send_async(response).await.unwrap_or(());
}
RunResult::Stream(StreamResult::Data(bytes)) => {
if done {
handle_error(
RunResult::Error("Got data after stream was done".into()),
&deployment_id,
&labels,
);

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
break;
}

let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
_ => {
handle_error(result, &deployment_id, &labels);
// Close the stream by sending empty bytes if we receive anything
// else than StreamResult (e.g errors)
done = result == RunResult::Stream(StreamResult::Done);

if !done {
handle_error(result, &deployment_id, &labels);
}

// Close the stream by sending empty bytes
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}
Expand Down
33 changes: 16 additions & 17 deletions packages/js-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,22 @@ export async function masterHandler(request: {
if (response.body && response.isStream) {
const reader = response.body.getReader();

new ReadableStream({
start: controller => {
const push = () => {
reader.read().then(({ done, value }) => {
if (done) {
controller.close();
Lagon.pullStream(done);
return;
}
controller.enqueue(value);
Lagon.pullStream(done, value);
push();
});
};
push();
},
});
const read = () => {
reader.read().then(({ done, value }) => {
if (done) {
Lagon.pullStream(done);
return;
}

if (value.byteLength !== 0) {
Lagon.pullStream(done, value);
}

read();
});
};

read();
} else {
// @ts-expect-error we reassign body even if it's readonly
response.body = await response.text();
Expand Down

0 comments on commit 083f639

Please sign in to comment.