From aaa704b3af8a4258037b1ce885db20eb537059c1 Mon Sep 17 00:00:00 2001 From: QuiiBz Date: Tue, 7 Feb 2023 18:30:08 +0100 Subject: [PATCH 1/2] fix(serverless): send bytes event on sync responses --- .changeset/eleven-berries-tan.md | 5 +++++ .changeset/popular-parents-build.md | 5 +++++ crates/runtime_http/src/lib.rs | 21 ++++++++++++++++++++- crates/runtime_http/src/request.rs | 4 +++- crates/runtime_http/src/response.rs | 4 ++-- crates/runtime_utils/src/response.rs | 9 +++++---- crates/serverless/src/main.rs | 2 +- 7 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 .changeset/eleven-berries-tan.md create mode 100644 .changeset/popular-parents-build.md diff --git a/.changeset/eleven-berries-tan.md b/.changeset/eleven-berries-tan.md new file mode 100644 index 000000000..bd8b9ab6d --- /dev/null +++ b/.changeset/eleven-berries-tan.md @@ -0,0 +1,5 @@ +--- +'@lagon/serverless': patch +--- + +Fix bytes event on sync responses diff --git a/.changeset/popular-parents-build.md b/.changeset/popular-parents-build.md new file mode 100644 index 000000000..bdcfde717 --- /dev/null +++ b/.changeset/popular-parents-build.md @@ -0,0 +1,5 @@ +--- +'@lagon/runtime': patch +--- + +Safely parse body up to 10MB diff --git a/crates/runtime_http/src/lib.rs b/crates/runtime_http/src/lib.rs index efe761ade..f863f2f2d 100644 --- a/crates/runtime_http/src/lib.rs +++ b/crates/runtime_http/src/lib.rs @@ -1,9 +1,11 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; +use std::error::Error; mod method; mod request; mod response; +use hyper::body::{to_bytes, Bytes, HttpBody}; pub use method::*; pub use request::*; pub use response::*; @@ -45,3 +47,20 @@ impl RunResult { panic!("RunResult is not an Error"); } } + +const BODY_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 10; // 10MB + +pub async fn safe_to_bytes(body: B) -> Result +where + B: HttpBody, + B::Error: Error + Send + Sync + 'static, +{ + let upper = body.size_hint().upper().unwrap_or(u64::MAX); + + if upper > BODY_MAX_SIZE_BYTES { + return Err(anyhow!("Body size is too large")); + } + + let full_body = to_bytes(body).await?; + Ok(full_body) +} diff --git a/crates/runtime_http/src/request.rs b/crates/runtime_http/src/request.rs index 90ffc3ad5..ef16eaa6c 100644 --- a/crates/runtime_http/src/request.rs +++ b/crates/runtime_http/src/request.rs @@ -10,6 +10,8 @@ use lagon_runtime_v8_utils::{ }; use std::{collections::HashMap, str::FromStr}; +use crate::safe_to_bytes; + use super::{FromV8, IntoV8, Method}; #[derive(Debug)] @@ -169,7 +171,7 @@ impl Request { .unwrap_or_default(); let url = format!("http://{}{}", host, request.uri().to_string().as_str()); - let body = body::to_bytes(request.into_body()).await?; + let body = safe_to_bytes(request.into_body()).await?; Ok(Request { headers: if !headers.is_empty() { diff --git a/crates/runtime_http/src/response.rs b/crates/runtime_http/src/response.rs index c0c032b57..c56a8a704 100644 --- a/crates/runtime_http/src/response.rs +++ b/crates/runtime_http/src/response.rs @@ -11,7 +11,7 @@ use lagon_runtime_v8_utils::{ }; use std::{collections::HashMap, str::FromStr}; -use crate::{FromV8, IntoV8}; +use crate::{safe_to_bytes, FromV8, IntoV8}; static READABLE_STREAM_STR: &[u8] = b"[object ReadableStream]"; @@ -169,7 +169,7 @@ impl Response { } let status = response.status().as_u16(); - let body = body::to_bytes(response.into_body()).await?; + let body = safe_to_bytes(response.into_body()).await?; Ok(Response { status, diff --git a/crates/runtime_utils/src/response.rs b/crates/runtime_utils/src/response.rs index 85f370ad4..a87bb9ab2 100644 --- a/crates/runtime_utils/src/response.rs +++ b/crates/runtime_utils/src/response.rs @@ -10,7 +10,7 @@ pub const PAGE_500: &str = include_str!("../public/500.html"); pub const FAVICON_URL: &str = "/favicon.ico"; pub enum ResponseEvent { - StreamData(usize), + Bytes(usize), StreamDoneNoDataError, StreamDoneDataError, UnexpectedStreamResult(RunResult), @@ -42,7 +42,7 @@ where response_tx.send_async(response).await.unwrap_or(()); } StreamResult::Data(bytes) => { - on_event(ResponseEvent::StreamData(bytes.len()), data.clone()); + on_event(ResponseEvent::Bytes(bytes.len()), data.clone()); let bytes = Bytes::from(bytes); stream_tx.send_async(Ok(bytes)).await.unwrap_or(()); @@ -64,7 +64,7 @@ where response_tx.send_async(response).await.unwrap_or(()); } RunResult::Stream(StreamResult::Data(bytes)) => { - on_event(ResponseEvent::StreamData(bytes.len()), data.clone()); + on_event(ResponseEvent::Bytes(bytes.len()), data.clone()); if done { on_event(ResponseEvent::StreamDoneDataError, data.clone()); @@ -100,8 +100,9 @@ where Ok(hyper_response) } RunResult::Response(response) => { - let hyper_response = Builder::try_from(&response)?.body(response.body.into())?; + on_event(ResponseEvent::Bytes(response.len()), data); + let hyper_response = Builder::try_from(&response)?.body(response.body.into())?; Ok(hyper_response) } RunResult::Timeout | RunResult::MemoryLimit => { diff --git a/crates/serverless/src/main.rs b/crates/serverless/src/main.rs index 9745ee3c4..90caf8988 100644 --- a/crates/serverless/src/main.rs +++ b/crates/serverless/src/main.rs @@ -272,7 +272,7 @@ async fn handle_request( rx, (deployment_id, labels), Box::new(|event, (deployment_id, labels)| match event { - ResponseEvent::StreamData(bytes) => { + ResponseEvent::Bytes(bytes) => { counter!("lagon_bytes_out", bytes as u64, &labels); } ResponseEvent::StreamDoneNoDataError => { From 96714556de126be0a8011ef43a176e06ff58c96a Mon Sep 17 00:00:00 2001 From: QuiiBz Date: Tue, 7 Feb 2023 18:38:22 +0100 Subject: [PATCH 2/2] fix: revert safe_to_bytes --- .changeset/popular-parents-build.md | 5 ----- crates/runtime_http/src/lib.rs | 21 +-------------------- crates/runtime_http/src/request.rs | 4 +--- crates/runtime_http/src/response.rs | 4 ++-- 4 files changed, 4 insertions(+), 30 deletions(-) delete mode 100644 .changeset/popular-parents-build.md diff --git a/.changeset/popular-parents-build.md b/.changeset/popular-parents-build.md deleted file mode 100644 index bdcfde717..000000000 --- a/.changeset/popular-parents-build.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@lagon/runtime': patch ---- - -Safely parse body up to 10MB diff --git a/crates/runtime_http/src/lib.rs b/crates/runtime_http/src/lib.rs index f863f2f2d..efe761ade 100644 --- a/crates/runtime_http/src/lib.rs +++ b/crates/runtime_http/src/lib.rs @@ -1,11 +1,9 @@ -use anyhow::{anyhow, Result}; -use std::error::Error; +use anyhow::Result; mod method; mod request; mod response; -use hyper::body::{to_bytes, Bytes, HttpBody}; pub use method::*; pub use request::*; pub use response::*; @@ -47,20 +45,3 @@ impl RunResult { panic!("RunResult is not an Error"); } } - -const BODY_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 10; // 10MB - -pub async fn safe_to_bytes(body: B) -> Result -where - B: HttpBody, - B::Error: Error + Send + Sync + 'static, -{ - let upper = body.size_hint().upper().unwrap_or(u64::MAX); - - if upper > BODY_MAX_SIZE_BYTES { - return Err(anyhow!("Body size is too large")); - } - - let full_body = to_bytes(body).await?; - Ok(full_body) -} diff --git a/crates/runtime_http/src/request.rs b/crates/runtime_http/src/request.rs index ef16eaa6c..90ffc3ad5 100644 --- a/crates/runtime_http/src/request.rs +++ b/crates/runtime_http/src/request.rs @@ -10,8 +10,6 @@ use lagon_runtime_v8_utils::{ }; use std::{collections::HashMap, str::FromStr}; -use crate::safe_to_bytes; - use super::{FromV8, IntoV8, Method}; #[derive(Debug)] @@ -171,7 +169,7 @@ impl Request { .unwrap_or_default(); let url = format!("http://{}{}", host, request.uri().to_string().as_str()); - let body = safe_to_bytes(request.into_body()).await?; + let body = body::to_bytes(request.into_body()).await?; Ok(Request { headers: if !headers.is_empty() { diff --git a/crates/runtime_http/src/response.rs b/crates/runtime_http/src/response.rs index c56a8a704..c0c032b57 100644 --- a/crates/runtime_http/src/response.rs +++ b/crates/runtime_http/src/response.rs @@ -11,7 +11,7 @@ use lagon_runtime_v8_utils::{ }; use std::{collections::HashMap, str::FromStr}; -use crate::{safe_to_bytes, FromV8, IntoV8}; +use crate::{FromV8, IntoV8}; static READABLE_STREAM_STR: &[u8] = b"[object ReadableStream]"; @@ -169,7 +169,7 @@ impl Response { } let status = response.status().as_u16(); - let body = safe_to_bytes(response.into_body()).await?; + let body = body::to_bytes(response.into_body()).await?; Ok(Response { status,