From a22d57baa57db52a8f268308a07f78c1b4433332 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 13 Dec 2024 16:14:30 -0800 Subject: [PATCH 1/3] Batching feature --- quic/s2n-quic-core/src/connection/limits.rs | 11 ++ quic/s2n-quic-transport/src/stream/manager.rs | 4 +- .../src/stream/manager/tests.rs | 78 ++++++++++++ .../src/stream/stream_container.rs | 114 ++++++++++++++---- 4 files changed, 183 insertions(+), 24 deletions(-) diff --git a/quic/s2n-quic-core/src/connection/limits.rs b/quic/s2n-quic-core/src/connection/limits.rs index 565a59d2ed..299b1a6ecc 100644 --- a/quic/s2n-quic-core/src/connection/limits.rs +++ b/quic/s2n-quic-core/src/connection/limits.rs @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30); //# received. pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3; +pub const STREAM_BURST_SIZE: u8 = 1; + #[non_exhaustive] #[derive(Debug)] pub struct ConnectionInfo<'a> { @@ -74,6 +76,7 @@ pub struct Limits { pub(crate) initial_round_trip_time: Duration, pub(crate) migration_support: MigrationSupport, pub(crate) anti_amplification_multiplier: u8, + pub(crate) stream_burst_size: u8, } impl Default for Limits { @@ -120,6 +123,7 @@ impl Limits { initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT, migration_support: MigrationSupport::RECOMMENDED, anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER, + stream_burst_size: STREAM_BURST_SIZE, } } @@ -222,6 +226,7 @@ impl Limits { max_active_connection_ids, u64 ); + setter!(with_stream_burst_size, stream_burst_size, u8); setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8); setter!(with_max_ack_ranges, ack_ranges_limit, u8); setter!( @@ -384,6 +389,12 @@ impl Limits { pub fn anti_amplification_multiplier(&self) -> u8 { self.anti_amplification_multiplier } + + #[doc(hidden)] + #[inline] + pub fn stream_burst_size(&self) -> u8 { + self.stream_burst_size + } } /// Creates limits for a given connection diff --git a/quic/s2n-quic-transport/src/stream/manager.rs b/quic/s2n-quic-transport/src/stream/manager.rs index efa396e920..152b915635 100644 --- a/quic/s2n-quic-transport/src/stream/manager.rs +++ b/quic/s2n-quic-transport/src/stream/manager.rs @@ -593,7 +593,7 @@ impl stream::Manager for AbstractStreamManager { connection_limits.stream_limits(), min_rtt, ), - streams: StreamContainer::new(), + streams: StreamContainer::new(connection_limits), next_stream_ids: StreamIdSet::initial(), local_endpoint_type, initial_local_limits, @@ -873,7 +873,7 @@ impl stream::Manager for AbstractStreamManager { if transmit_result.is_err() { StreamContainerIterationResult::BreakAndInsertAtBack } else { - StreamContainerIterationResult::Continue + StreamContainerIterationResult::UseStreamCredit } }, ); diff --git a/quic/s2n-quic-transport/src/stream/manager/tests.rs b/quic/s2n-quic-transport/src/stream/manager/tests.rs index 1e078b98d6..1dbad069c2 100644 --- a/quic/s2n-quic-transport/src/stream/manager/tests.rs +++ b/quic/s2n-quic-transport/src/stream/manager/tests.rs @@ -3249,3 +3249,81 @@ fn stream_transmission_fairness_test() { } } } + +#[test] +fn stream_batching_test() { + // Create stream manager with a burst size of 5 + let batch_size = 5; + let limits = ConnectionLimits::default() + .with_stream_burst_size(batch_size) + .unwrap(); + + let mut manager = AbstractStreamManager::::new( + &limits, + endpoint::Type::Server, + create_default_initial_flow_control_limits(), + create_default_initial_flow_control_limits(), + DEFAULT_INITIAL_RTT, + ); + + // Create some open Streams + let mut stream_ids: VecDeque = (0..4) + .map(|_| { + let (accept_waker, _accept_wake_counter) = new_count_waker(); + let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); + let mut token = connection::OpenToken::new(); + + let result = match manager.poll_open_local_stream( + StreamType::Bidirectional, + &mut token, + &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), + &Context::from_waker(&accept_waker), + ) { + Poll::Ready(res) => res, + Poll::Pending => Err(connection::Error::unspecified()), + }; + result.unwrap() + }) + .collect(); + + // Create a context that can only fit packets of size 50 + let mut frame_buffer = OutgoingFrameBuffer::new(); + let max_packet_size = 50; + frame_buffer.set_max_packet_size(Some(max_packet_size)); + let mut write_context = MockWriteContext::new( + time::now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Server, + ); + + const DATA_SIZE: usize = 2000; + let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; + + // Set up each stream to have much more data to send than can fit in our test packet + for stream_id in &stream_ids { + manager + .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { + let data_to_send = bytes::Bytes::copy_from_slice(&array); + stream.poll_request(&mut ops::Request::default().send(&mut [data_to_send]), None) + }) + .unwrap(); + } + // make sure the order matches creation order + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + + // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. + // Then the stream gets sent to the back of the transmission list. + for idx in 1..=40 { + let _ = manager.on_transmit(&mut write_context); + write_context.frame_buffer.flush(); + + if idx % batch_size == 0 { + // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets + stream_ids.rotate_left(1); + } + + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + } +} diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index f9b3ad10bb..f992b868a6 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -8,8 +8,8 @@ #![allow(unknown_lints, clippy::non_send_fields_in_send_ty)] use crate::{ - stream, - stream::{stream_impl::StreamTrait, stream_interests::StreamInterests}, + connection, + stream::{self, stream_impl::StreamTrait, stream_interests::StreamInterests}, transmission, }; use alloc::rc::Rc; @@ -145,10 +145,17 @@ struct InterestLists { /// stream flow control window to increase waiting_for_stream_flow_control_credits: LinkedList>, + transmission_counter: u8, + transmission_limit: u8, +} + +struct Outcome { + done: bool, + push_back_remaining: bool, } impl InterestLists { - fn new() -> Self { + fn new(connection_limits: &connection::Limits) -> Self { Self { done_streams: LinkedList::new(DoneStreamsAdapter::new()), waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()), @@ -160,6 +167,8 @@ impl InterestLists { waiting_for_stream_flow_control_credits: LinkedList::new( WaitingForStreamFlowControlCreditsAdapter::new(), ), + transmission_counter: 0, + transmission_limit: connection_limits.stream_burst_size(), } } @@ -169,19 +178,68 @@ impl InterestLists { node: &Rc>, interests: StreamInterests, result: StreamContainerIterationResult, - ) -> bool { + ) -> Outcome { // Note that all comparisons start by checking whether the stream is // already part of the given list. This is required in order for the // following operation to be safe. Inserting an element in a list while // it is already part of a (different) list can panic. Trying to remove // an element from a list while it is not actually part of the list // is undefined. + let mut outcome = Outcome { + done: false, + push_back_remaining: true, + }; + + macro_rules! sync_trans_interests { + ($interest:expr, $link_name:ident, $list_name:ident) => { + if $interest != node.$link_name.is_linked() { + if $interest { + if matches!(result, StreamContainerIterationResult::UseStreamCredit) { + self.$list_name.push_back(node.clone()); + self.transmission_counter += 1; + } else if matches!(result, StreamContainerIterationResult::Continue) { + self.$list_name.push_back(node.clone()); + } else if matches!( + result, + StreamContainerIterationResult::BreakAndInsertAtBack + ) && !(self.transmission_counter < self.transmission_limit) + { + // In this case the node at the front of the list has no more sending + // credits. Therefore we push the current node to the front of the list + // and push the remaining nodes behind the current node, which effectively + // moves the front node to the back of the list. + self.$list_name.push_front(node.clone()); + self.transmission_counter = 0; + outcome.push_back_remaining = false; + } else { + // In this case the node at the front of the list still has sending credits. + // Therefore we want to preserve the order of the list by pushing each + // node back. + self.$list_name.push_back(node.clone()); + outcome.push_back_remaining = true; + } + } else { + // Safety: We know that the node is only ever part of this list. + // While elements are in temporary lists, they always get unlinked + // from those temporary lists while their interest is updated. + let mut cursor = unsafe { + self.$list_name + .cursor_mut_from_ptr(node.deref() as *const StreamNode) + }; + cursor.remove(); + } + } + debug_assert_eq!($interest, node.$link_name.is_linked()); + }; + } macro_rules! sync_interests { ($interest:expr, $link_name:ident, $list_name:ident) => { if $interest != node.$link_name.is_linked() { if $interest { - if matches!(result, StreamContainerIterationResult::Continue) { + if matches!(result, StreamContainerIterationResult::Continue) + || matches!(result, StreamContainerIterationResult::UseStreamCredit) + { self.$list_name.push_back(node.clone()); } else { self.$list_name.push_front(node.clone()); @@ -206,7 +264,7 @@ impl InterestLists { waiting_for_frame_delivery_link, waiting_for_frame_delivery ); - sync_interests!( + sync_trans_interests!( matches!(interests.transmission, transmission::Interest::NewData), waiting_for_transmission_link, waiting_for_transmission @@ -233,10 +291,11 @@ impl InterestLists { } else { panic!("Done streams should never report not done later"); } - true + outcome.done = true; } else { - false + outcome.done = false; } + outcome } } @@ -308,34 +367,43 @@ macro_rules! iterate_interruptible { // Update the interests after the interaction let interests = mut_stream.get_stream_interests(); - $sel.interest_lists + let outcome = $sel + .interest_lists .update_interests(&stream, interests, result); match result { StreamContainerIterationResult::BreakAndInsertAtBack => { - $sel.interest_lists - .$list_name - .front_mut() - .splice_after(extracted_list); - break; + if outcome.push_back_remaining { + $sel.interest_lists + .$list_name + .back_mut() + .splice_after(extracted_list); + break; + } else { + $sel.interest_lists + .$list_name + .front_mut() + .splice_after(extracted_list); + break; + } } - StreamContainerIterationResult::Continue => {} + _ => {} } - } - if !$sel.interest_lists.done_streams.is_empty() { - $sel.finalize_done_streams($controller); + if !$sel.interest_lists.done_streams.is_empty() { + $sel.finalize_done_streams($controller); + } } }; } impl StreamContainer { /// Creates a new `StreamContainer` - pub fn new() -> Self { + pub fn new(connection_limits: &connection::Limits) -> Self { Self { stream_map: RBTree::new(StreamTreeAdapter::new()), nr_active_streams: 0, - interest_lists: InterestLists::new(), + interest_lists: InterestLists::new(connection_limits), } } @@ -414,11 +482,12 @@ impl StreamContainer { // Update the interest lists after the interactions and then remove // all finalized streams - if self.interest_lists.update_interests( + let outcome = self.interest_lists.update_interests( &node_ptr, interests, StreamContainerIterationResult::Continue, - ) { + ); + if outcome.done { self.finalize_done_streams(controller); } @@ -659,6 +728,7 @@ impl transmission::interest::Provider for StreamContainer { pub enum StreamContainerIterationResult { /// Continue iteration over the list Continue, + UseStreamCredit, /// Aborts the iteration over a list and add the remaining items at the /// back of the list BreakAndInsertAtBack, From 31931f5f3b5deef166060f8b39c4f8e497344f97 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Dec 2024 11:11:45 -0800 Subject: [PATCH 2/3] clippy --- quic/s2n-quic-transport/src/stream/manager/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic/s2n-quic-transport/src/stream/manager/tests.rs b/quic/s2n-quic-transport/src/stream/manager/tests.rs index 1dbad069c2..3aabe7b6fa 100644 --- a/quic/s2n-quic-transport/src/stream/manager/tests.rs +++ b/quic/s2n-quic-transport/src/stream/manager/tests.rs @@ -3306,7 +3306,7 @@ fn stream_batching_test() { manager .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { let data_to_send = bytes::Bytes::copy_from_slice(&array); - stream.poll_request(&mut ops::Request::default().send(&mut [data_to_send]), None) + stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) }) .unwrap(); } From 337afe0ca07f75d309696807ac735840005577f5 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Dec 2024 17:16:30 -0800 Subject: [PATCH 3/3] Fixing deduplicate test --- .../src/stream/stream_container.rs | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index f992b868a6..dae268e69b 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -194,29 +194,38 @@ impl InterestLists { ($interest:expr, $link_name:ident, $list_name:ident) => { if $interest != node.$link_name.is_linked() { if $interest { - if matches!(result, StreamContainerIterationResult::UseStreamCredit) { - self.$list_name.push_back(node.clone()); - self.transmission_counter += 1; - } else if matches!(result, StreamContainerIterationResult::Continue) { + let stream_credits = self.transmission_counter < self.transmission_limit; + if matches!(result, StreamContainerIterationResult::UseStreamCredit) + { self.$list_name.push_back(node.clone()); + if stream_credits { + self.transmission_counter += 1; + } else { + self.transmission_counter = 0; + } } else if matches!( result, StreamContainerIterationResult::BreakAndInsertAtBack - ) && !(self.transmission_counter < self.transmission_limit) - { - // In this case the node at the front of the list has no more sending - // credits. Therefore we push the current node to the front of the list - // and push the remaining nodes behind the current node, which effectively - // moves the front node to the back of the list. - self.$list_name.push_front(node.clone()); - self.transmission_counter = 0; - outcome.push_back_remaining = false; - } else { - // In this case the node at the front of the list still has sending credits. - // Therefore we want to preserve the order of the list by pushing each - // node back. + ){ + if stream_credits { + // In this case the node at the front of the list still has sending credits. + // Therefore we want to preserve the order of the list by pushing each + // node back. + self.$list_name.push_back(node.clone()); + outcome.push_back_remaining = true; + } else { + // In this case the node at the front of the list has no more sending + // credits. Therefore we push the current node to the front of the list + // and push the remaining nodes behind the current node, which effectively + // moves the front node to the back of the list. + self.$list_name.push_front(node.clone()); + self.transmission_counter = 0; + outcome.push_back_remaining = false; + } + } else if matches!(result, StreamContainerIterationResult::Continue) { + // In this case we are simply iterating through the list and don't + // need to increment credits self.$list_name.push_back(node.clone()); - outcome.push_back_remaining = true; } } else { // Safety: We know that the node is only ever part of this list.