@@ -296,6 +296,113 @@ async fn test_chunked_fragmented_multiple_frames_in_one_chunk() {
296
296
assert ! ( start. elapsed( ) >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN ) ;
297
297
}
298
298
299
+ #[ tokio:: test]
300
+ async fn test_connection_reuse_with_chunked_fragmented_multiple_frames_in_one_chunk ( ) {
301
+ // Define constants for delay and timing margin
302
+ const DELAY_BETWEEN_RESPONSE_PARTS : tokio:: time:: Duration =
303
+ tokio:: time:: Duration :: from_millis ( 1000 ) ; // 1-second delay
304
+ const DELAY_MARGIN : tokio:: time:: Duration = tokio:: time:: Duration :: from_millis ( 50 ) ; // Margin for timing assertions
305
+
306
+ // We will record the peer addresses of each client request here
307
+ let peer_addrs = std:: sync:: Arc :: new ( std:: sync:: Mutex :: new ( Vec :: < std:: net:: SocketAddr > :: new ( ) ) ) ;
308
+ let peer_addrs_clone = peer_addrs. clone ( ) ;
309
+
310
+ // Set up a low-level server (it will reuse existing client connection, executing callback for each client request)
311
+ let server = server:: low_level_with_response ( move |_raw_request, client_socket| {
312
+ let peer_addrs = peer_addrs_clone. clone ( ) ;
313
+ Box :: new ( async move {
314
+ // Split RESPONSE_CONTENT into two parts
315
+ let mid = RESPONSE_CONTENT . len ( ) / 2 ;
316
+ let part1 = & RESPONSE_CONTENT [ 0 ..mid] ;
317
+ let part2 = & RESPONSE_CONTENT [ mid..] ;
318
+
319
+ // Compress each part separately to create two ZSTD frames
320
+ let compressed_part1 = zstd_compress ( part1. as_bytes ( ) ) ;
321
+ let compressed_part2 = zstd_compress ( part2. as_bytes ( ) ) ;
322
+
323
+ // Concatenate the frames into a single chunk's data
324
+ let chunk_data = [ compressed_part1. as_slice ( ) , compressed_part2. as_slice ( ) ] . concat ( ) ;
325
+
326
+ // Calculate the chunk size in bytes
327
+ let chunk_size = chunk_data. len ( ) ;
328
+
329
+ // Prepare the initial response part: headers + chunk size
330
+ let headers = [
331
+ COMPRESSED_RESPONSE_HEADERS , // e.g., "HTTP/1.1 200 OK\r\nContent-Encoding: zstd\r\n"
332
+ b"Transfer-Encoding: chunked\r \n \r \n " , // Indicate chunked encoding
333
+ format ! ( "{:x}\r \n " , chunk_size) . as_bytes ( ) , // Chunk size in hex
334
+ ]
335
+ . concat ( ) ;
336
+
337
+ // Send headers + chunk size + chunk data
338
+ client_socket
339
+ . write_all ( [ headers. as_slice ( ) , & chunk_data] . concat ( ) . as_slice ( ) )
340
+ . await
341
+ . expect ( "write_all failed" ) ;
342
+ client_socket. flush ( ) . await . expect ( "flush failed" ) ;
343
+
344
+ // Introduce a delay to simulate fragmentation
345
+ tokio:: time:: sleep ( DELAY_BETWEEN_RESPONSE_PARTS ) . await ;
346
+
347
+ peer_addrs
348
+ . lock ( )
349
+ . unwrap ( )
350
+ . push ( client_socket. peer_addr ( ) . unwrap ( ) ) ;
351
+
352
+ // Send chunk terminator + final chunk
353
+ client_socket
354
+ . write_all ( b"\r \n 0\r \n \r \n " )
355
+ . await
356
+ . expect ( "write_all failed" ) ;
357
+ client_socket. flush ( ) . await . expect ( "flush failed" ) ;
358
+ } )
359
+ } ) ;
360
+
361
+ let client = reqwest:: Client :: builder ( )
362
+ . pool_idle_timeout ( std:: time:: Duration :: from_secs ( 30 ) )
363
+ . pool_max_idle_per_host ( 1 )
364
+ . build ( )
365
+ . unwrap ( ) ;
366
+
367
+ const NUMBER_OF_REQUESTS : usize = 5 ;
368
+
369
+ for _ in 0 ..NUMBER_OF_REQUESTS {
370
+ // Record the start time for delay verification
371
+ let start = tokio:: time:: Instant :: now ( ) ;
372
+
373
+ let res = client
374
+ . get ( format ! ( "http://{}/" , server. addr( ) ) )
375
+ . send ( )
376
+ . await
377
+ . expect ( "Failed to get response" ) ;
378
+
379
+ // Verify the decompressed response matches the original content
380
+ assert_eq ! (
381
+ res. text( ) . await . expect( "Failed to read text" ) ,
382
+ RESPONSE_CONTENT
383
+ ) ;
384
+ assert ! ( start. elapsed( ) >= DELAY_BETWEEN_RESPONSE_PARTS - DELAY_MARGIN ) ;
385
+ }
386
+
387
+ drop ( client) ;
388
+
389
+ // Check that all peer addresses are the same
390
+ let peer_addrs = peer_addrs. lock ( ) . unwrap ( ) ;
391
+ assert_eq ! (
392
+ peer_addrs. len( ) ,
393
+ NUMBER_OF_REQUESTS ,
394
+ "Expected {} peer addresses, but got {}" ,
395
+ NUMBER_OF_REQUESTS ,
396
+ peer_addrs. len( )
397
+ ) ;
398
+ let first_addr = peer_addrs[ 0 ] ;
399
+ assert ! (
400
+ peer_addrs. iter( ) . all( |addr| addr == & first_addr) ,
401
+ "All peer addresses should be the same, but found differences: {:?}" ,
402
+ peer_addrs
403
+ ) ;
404
+ }
405
+
299
406
#[ tokio:: test]
300
407
async fn test_chunked_fragmented_response_1 ( ) {
301
408
const DELAY_BETWEEN_RESPONSE_PARTS : tokio:: time:: Duration =
0 commit comments