Skip to content

Commit 3099f30

Browse files
authored
feat: Add a Form::into_stream() method on async multipart forms. (#2525)
The async equivalent of the change in #2524. An example use case is compressing multipart form data with zstd. The entire contents of the payload have to be compressed together, which requires accessing the contents of the Form. The existing stream functionality mostly implements what we need, but just isn't publicly accessible. This PR adds a pub method for it.
1 parent f7d929f commit 3099f30

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/async_impl/multipart.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,23 @@ impl Form {
140140
}
141141

142142
/// Consume this instance and transform into an instance of Body for use in a request.
143-
pub(crate) fn stream(mut self) -> Body {
143+
pub(crate) fn stream(self) -> Body {
144144
if self.inner.fields.is_empty() {
145145
return Body::empty();
146146
}
147147

148+
Body::stream(self.into_stream())
149+
}
150+
151+
/// Produce a stream of the bytes in this `Form`, consuming it.
152+
pub fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, crate::Error>> + Send + Sync {
153+
if self.inner.fields.is_empty() {
154+
let empty_stream: Pin<
155+
Box<dyn Stream<Item = Result<Bytes, crate::Error>> + Send + Sync>,
156+
> = Box::pin(futures_util::stream::empty());
157+
return empty_stream;
158+
}
159+
148160
// create initial part to init reduce chain
149161
let (name, part) = self.inner.fields.remove(0);
150162
let start = Box::pin(self.part_stream(name, part))
@@ -161,7 +173,7 @@ impl Form {
161173
let last = stream::once(future::ready(Ok(
162174
format!("--{}--\r\n", self.boundary()).into()
163175
)));
164-
Body::stream(stream.chain(last))
176+
Box::pin(stream.chain(last))
165177
}
166178

167179
/// Generate a hyper::Body stream for a single Part instance of a Form request.

0 commit comments

Comments
 (0)