Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServeDir: reuse buf and reponse Stream, optimize the default chunk buf size #137

Merged
merged 5 commits into from
Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions tower-http/src/services/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! File system related services.

use bytes::{Bytes, BytesMut};
use futures_core::ready;
use bytes::Bytes;
use http::{HeaderMap, Response, StatusCode};
use http_body::{combinators::BoxBody, Body, Empty};
use pin_project::pin_project;
Expand All @@ -11,11 +10,16 @@ use std::{
task::{Context, Poll},
};
use tokio::io::AsyncRead;
use tokio_util::io::poll_read_buf;

use futures_util::Stream;
use tokio_util::io::ReaderStream;

mod serve_dir;
mod serve_file;

// default capacity 64KiB
const DEFAULT_CAPACITY: usize = 65536;

pub use self::{
serve_dir::{
ResponseBody as ServeDirResponseBody, ResponseFuture as ServeDirResponseFuture, ServeDir,
Expand All @@ -31,13 +35,26 @@ pub use self::{
#[derive(Debug)]
pub struct AsyncReadBody<T> {
#[pin]
inner: T,
reader: ReaderStream<T>,
}

impl<T> AsyncReadBody<T> {
impl<T> AsyncReadBody<T>
where
T: AsyncRead,
{
/// Create a new [`AsyncReadBody`] wrapping the given reader.
fn new(read: T) -> Self {
Self { inner: read }
Self {
reader: ReaderStream::with_capacity(read, DEFAULT_CAPACITY),
}
}

/// Create a new [`AsyncReadBody`] wrapping the given reader,
/// with a specific read buffer capacity
fn with_capacity(read: T, capacity: usize) -> Self {
Self {
reader: ReaderStream::with_capacity(read, capacity),
}
}
}

Expand All @@ -52,14 +69,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut buf = BytesMut::new();
let read = ready!(poll_read_buf(self.project().inner, cx, &mut buf)?);

if read == 0 {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(buf.freeze())))
}
self.project().reader.poll_next(cx)
}

fn poll_trailers(
Expand Down
39 changes: 34 additions & 5 deletions tower-http/src/services/fs/serve_dir.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::AsyncReadBody;
use crate::services::fs::DEFAULT_CAPACITY;
use bytes::Bytes;
use futures_util::ready;
use http::{header, HeaderValue, Request, Response, StatusCode, Uri};
Expand Down Expand Up @@ -28,6 +29,7 @@ use tower_service::Service;
pub struct ServeDir {
base: PathBuf,
append_index_html_on_directories: bool,
buf_chunk_size: usize,
}

impl ServeDir {
Expand All @@ -39,6 +41,7 @@ impl ServeDir {
Self {
base,
append_index_html_on_directories: true,
buf_chunk_size: DEFAULT_CAPACITY,
}
}

Expand All @@ -51,6 +54,12 @@ impl ServeDir {
self.append_index_html_on_directories = append;
self
}

/// set custom buffer chunk size.
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_buf_chunk_size(mut self, chunk_size: usize) -> Self {
self.buf_chunk_size = chunk_size;
self
}
}

impl<ReqBody> Service<Request<ReqBody>> for ServeDir {
Expand Down Expand Up @@ -87,6 +96,7 @@ impl<ReqBody> Service<Request<ReqBody>> for ServeDir {
}

let append_index_html_on_directories = self.append_index_html_on_directories;
let buf_chunk_size = self.buf_chunk_size;
let uri = req.uri().clone();

let open_file_future = Box::pin(async move {
Expand All @@ -113,7 +123,7 @@ impl<ReqBody> Service<Request<ReqBody>> for ServeDir {
});

let file = File::open(full_path).await?;
Ok(Output::File(file, mime))
Ok(Output::File(file, mime, buf_chunk_size))
});

ResponseFuture {
Expand Down Expand Up @@ -158,7 +168,7 @@ fn append_slash_on_path(uri: Uri) -> Uri {
}

enum Output {
File(File, HeaderValue),
File(File, HeaderValue, usize),
Redirect(HeaderValue),
NotFound,
}
Expand All @@ -181,8 +191,8 @@ impl Future for ResponseFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut self.inner {
Inner::Valid(open_file_future) => {
let (file, mime) = match ready!(Pin::new(open_file_future).poll(cx)) {
Ok(Output::File(file, mime)) => (file, mime),
let (file, mime, chunk_size) = match ready!(Pin::new(open_file_future).poll(cx)) {
Ok(Output::File(file, mime, chunk_size)) => (file, mime, chunk_size),

Ok(Output::Redirect(location)) => {
let res = Response::builder()
Expand All @@ -208,7 +218,7 @@ impl Future for ResponseFuture {
)
}
};
let body = AsyncReadBody::new(file).boxed();
let body = AsyncReadBody::with_capacity(file, chunk_size).boxed();
let body = ResponseBody(body);

let mut res = Response::new(body);
Expand Down Expand Up @@ -266,6 +276,25 @@ mod tests {
assert_eq!(body, contents);
}

#[tokio::test]
async fn with_custom_chunk_size() {
let svc = ServeDir::new("..").with_buf_chunk_size(1024 * 32);

let req = Request::builder()
.uri("/README.md")
.body(Body::empty())
.unwrap();
let res = svc.oneshot(req).await.unwrap();

assert_eq!(res.status(), StatusCode::OK);
assert_eq!(res.headers()["content-type"], "text/markdown");

let body = body_into_text(res.into_body()).await;

let contents = std::fs::read_to_string("../README.md").unwrap();
assert_eq!(body, contents);
}

#[tokio::test]
async fn access_to_sub_dirs() {
let svc = ServeDir::new("..");
Expand Down
39 changes: 36 additions & 3 deletions tower-http/src/services/fs/serve_file.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Service that serves a file.

use super::AsyncReadBody;
use crate::services::fs::DEFAULT_CAPACITY;
use bytes::Bytes;
use futures_util::ready;
use http::{header, HeaderValue, Response};
Expand All @@ -21,6 +22,7 @@ use tower_service::Service;
pub struct ServeFile {
path: PathBuf,
mime: HeaderValue,
buf_chunk_size: usize,
}

impl ServeFile {
Expand All @@ -38,7 +40,11 @@ impl ServeFile {

let path = path.as_ref().to_owned();

Self { path, mime }
Self {
path,
mime,
buf_chunk_size: DEFAULT_CAPACITY,
}
}

/// Create a new [`ServeFile`] with a specific mime type.
Expand All @@ -52,7 +58,17 @@ impl ServeFile {
let mime = HeaderValue::from_str(mime.as_ref()).expect("mime isn't a valid header value");
let path = path.as_ref().to_owned();

Self { path, mime }
Self {
path,
mime,
buf_chunk_size: DEFAULT_CAPACITY,
}
}

/// set custom buffer chunk size.
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_buf_chunk_size(mut self, chunk_size: usize) -> Self {
self.buf_chunk_size = chunk_size;
self
}
}

Expand All @@ -72,6 +88,7 @@ impl<R> Service<R> for ServeFile {
ResponseFuture {
open_file_future,
mime: Some(self.mime.clone()),
buf_chunk_size: self.buf_chunk_size,
}
}
}
Expand All @@ -80,6 +97,7 @@ impl<R> Service<R> for ServeFile {
pub struct ResponseFuture {
open_file_future: Pin<Box<dyn Future<Output = io::Result<File>> + Send + Sync + 'static>>,
mime: Option<HeaderValue>,
buf_chunk_size: usize,
}

impl Future for ResponseFuture {
Expand All @@ -97,7 +115,8 @@ impl Future for ResponseFuture {
}
};

let body = AsyncReadBody::new(file).boxed();
let chunk_size = self.buf_chunk_size;
let body = AsyncReadBody::with_capacity(file, chunk_size).boxed();
let body = ResponseBody(body);

let mut res = Response::new(body);
Expand Down Expand Up @@ -136,6 +155,20 @@ mod tests {
assert!(body.starts_with("# Tower HTTP"));
}

#[tokio::test]
async fn with_custom_chunk_size() {
let svc = ServeFile::new("../README.md").with_buf_chunk_size(1024 * 32);

let res = svc.oneshot(Request::new(Body::empty())).await.unwrap();

assert_eq!(res.headers()["content-type"], "text/markdown");

let body = res.into_body().data().await.unwrap().unwrap();
let body = String::from_utf8(body.to_vec()).unwrap();

assert!(body.starts_with("# Tower HTTP"));
}

#[tokio::test]
async fn returns_404_if_file_doesnt_exist() {
let svc = ServeFile::new("../this-doesnt-exist.md");
Expand Down