Skip to content

Commit 6f9d0ee

Browse files
authored
fix: support HTTP responses containing multiple ZSTD frames (#2583)
Closes #2574
1 parent 44ac897 commit 6f9d0ee

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

src/async_impl/decoder.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,11 @@ impl Future for Pending {
522522
#[cfg(feature = "zstd")]
523523
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(
524524
FramedRead::new(
525-
ZstdDecoder::new(StreamReader::new(_body)),
525+
{
526+
let mut d = ZstdDecoder::new(StreamReader::new(_body));
527+
d.multiple_members(true);
528+
d
529+
},
526530
BytesCodec::new(),
527531
)
528532
.fuse(),

tests/zstd.rs

+110
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,116 @@ async fn test_non_chunked_non_fragmented_response() {
186186
assert_eq!(res.text().await.expect("text"), RESPONSE_CONTENT);
187187
}
188188

189+
// Big response can have multiple ZSTD frames in it
190+
#[tokio::test]
191+
async fn test_non_chunked_non_fragmented_multiple_frames_response() {
192+
let server = server::low_level_with_response(|_raw_request, client_socket| {
193+
Box::new(async move {
194+
// Split the content into two parts
195+
let content_bytes = RESPONSE_CONTENT.as_bytes();
196+
let mid = content_bytes.len() / 2;
197+
// Compress each part separately to create multiple ZSTD frames
198+
let compressed_part1 = zstd_crate::encode_all(&content_bytes[0..mid], 3).unwrap();
199+
let compressed_part2 = zstd_crate::encode_all(&content_bytes[mid..], 3).unwrap();
200+
// Concatenate the compressed frames
201+
let mut zstded_content = compressed_part1;
202+
zstded_content.extend_from_slice(&compressed_part2);
203+
// Set Content-Length to the total length of the concatenated frames
204+
let content_length_header =
205+
format!("Content-Length: {}\r\n\r\n", zstded_content.len()).into_bytes();
206+
let response = [
207+
COMPRESSED_RESPONSE_HEADERS,
208+
&content_length_header,
209+
&zstded_content,
210+
]
211+
.concat();
212+
213+
client_socket
214+
.write_all(response.as_slice())
215+
.await
216+
.expect("response write_all failed");
217+
client_socket.flush().await.expect("response flush failed");
218+
})
219+
});
220+
221+
let res = reqwest::Client::new()
222+
.get(format!("http://{}/", server.addr()))
223+
.send()
224+
.await
225+
.expect("response");
226+
227+
assert_eq!(res.text().await.expect("text"), RESPONSE_CONTENT);
228+
}
229+
230+
#[tokio::test]
231+
async fn test_chunked_fragmented_multiple_frames_in_one_chunk() {
232+
// Define constants for delay and timing margin
233+
const DELAY_BETWEEN_RESPONSE_PARTS: tokio::time::Duration =
234+
tokio::time::Duration::from_millis(1000); // 1-second delay
235+
const DELAY_MARGIN: tokio::time::Duration = tokio::time::Duration::from_millis(50); // Margin for timing assertions
236+
237+
// Set up a low-level server
238+
let server = server::low_level_with_response(|_raw_request, client_socket| {
239+
Box::new(async move {
240+
// Split RESPONSE_CONTENT into two parts
241+
let mid = RESPONSE_CONTENT.len() / 2;
242+
let part1 = &RESPONSE_CONTENT[0..mid];
243+
let part2 = &RESPONSE_CONTENT[mid..];
244+
245+
// Compress each part separately to create two ZSTD frames
246+
let compressed_part1 = zstd_compress(part1.as_bytes());
247+
let compressed_part2 = zstd_compress(part2.as_bytes());
248+
249+
// Concatenate the frames into a single chunk's data
250+
let chunk_data = [compressed_part1.as_slice(), compressed_part2.as_slice()].concat();
251+
252+
// Calculate the chunk size in bytes
253+
let chunk_size = chunk_data.len();
254+
255+
// Prepare the initial response part: headers + chunk size
256+
let headers = [
257+
COMPRESSED_RESPONSE_HEADERS, // e.g., "HTTP/1.1 200 OK\r\nContent-Encoding: zstd\r\n"
258+
b"Transfer-Encoding: chunked\r\n\r\n", // Indicate chunked encoding
259+
format!("{:x}\r\n", chunk_size).as_bytes(), // Chunk size in hex
260+
]
261+
.concat();
262+
263+
// Send headers + chunk size + chunk data
264+
client_socket
265+
.write_all([headers.as_slice(), &chunk_data].concat().as_slice())
266+
.await
267+
.expect("write_all failed");
268+
client_socket.flush().await.expect("flush failed");
269+
270+
// Introduce a delay to simulate fragmentation
271+
tokio::time::sleep(DELAY_BETWEEN_RESPONSE_PARTS).await;
272+
273+
// Send chunk terminator + final chunk
274+
client_socket
275+
.write_all(b"\r\n0\r\n\r\n")
276+
.await
277+
.expect("write_all failed");
278+
client_socket.flush().await.expect("flush failed");
279+
})
280+
});
281+
282+
// Record the start time for delay verification
283+
let start = tokio::time::Instant::now();
284+
285+
let res = reqwest::Client::new()
286+
.get(format!("http://{}/", server.addr()))
287+
.send()
288+
.await
289+
.expect("Failed to get response");
290+
291+
// Verify the decompressed response matches the original content
292+
assert_eq!(
293+
res.text().await.expect("Failed to read text"),
294+
RESPONSE_CONTENT
295+
);
296+
assert!(start.elapsed() >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN);
297+
}
298+
189299
#[tokio::test]
190300
async fn test_chunked_fragmented_response_1() {
191301
const DELAY_BETWEEN_RESPONSE_PARTS: tokio::time::Duration =

0 commit comments

Comments
 (0)