Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-transport): Adds stream batching functionality #2428

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30);
//# received.
pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3;

pub const STREAM_BURST_SIZE: u8 = 1;
maddeleine marked this conversation as resolved.
Show resolved Hide resolved

#[non_exhaustive]
#[derive(Debug)]
pub struct ConnectionInfo<'a> {
Expand Down Expand Up @@ -74,6 +76,7 @@ pub struct Limits {
pub(crate) initial_round_trip_time: Duration,
pub(crate) migration_support: MigrationSupport,
pub(crate) anti_amplification_multiplier: u8,
pub(crate) stream_burst_size: u8,
}

impl Default for Limits {
Expand Down Expand Up @@ -120,6 +123,7 @@ impl Limits {
initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT,
migration_support: MigrationSupport::RECOMMENDED,
anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER,
stream_burst_size: STREAM_BURST_SIZE,
}
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Limits {
max_active_connection_ids,
u64
);
setter!(with_stream_burst_size, stream_burst_size, u8);
setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8);
setter!(with_max_ack_ranges, ack_ranges_limit, u8);
setter!(
Expand Down Expand Up @@ -384,6 +389,12 @@ impl Limits {
pub fn anti_amplification_multiplier(&self) -> u8 {
self.anti_amplification_multiplier
}

#[doc(hidden)]
#[inline]
pub fn stream_burst_size(&self) -> u8 {
self.stream_burst_size
}
}

/// Creates limits for a given connection
Expand Down
4 changes: 2 additions & 2 deletions quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
connection_limits.stream_limits(),
min_rtt,
),
streams: StreamContainer::new(),
streams: StreamContainer::new(connection_limits),
next_stream_ids: StreamIdSet::initial(),
local_endpoint_type,
initial_local_limits,
Expand Down Expand Up @@ -873,7 +873,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
if transmit_result.is_err() {
StreamContainerIterationResult::BreakAndInsertAtBack
} else {
StreamContainerIterationResult::Continue
StreamContainerIterationResult::UseStreamCredit
}
},
);
Expand Down
78 changes: 78 additions & 0 deletions quic/s2n-quic-transport/src/stream/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3249,3 +3249,81 @@ fn stream_transmission_fairness_test() {
}
}
}

#[test]
fn stream_batching_test() {
// Create stream manager with a burst size of 5
let batch_size = 5;
let limits = ConnectionLimits::default()
.with_stream_burst_size(batch_size)
.unwrap();

let mut manager = AbstractStreamManager::<stream::StreamImpl>::new(
&limits,
endpoint::Type::Server,
create_default_initial_flow_control_limits(),
create_default_initial_flow_control_limits(),
DEFAULT_INITIAL_RTT,
);

// Create some open Streams
let mut stream_ids: VecDeque<StreamId> = (0..4)
.map(|_| {
let (accept_waker, _accept_wake_counter) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();

let result = match manager.poll_open_local_stream(
StreamType::Bidirectional,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&accept_waker),
) {
Poll::Ready(res) => res,
Poll::Pending => Err(connection::Error::unspecified()),
};
result.unwrap()
})
.collect();

// Create a context that can only fit packets of size 50
let mut frame_buffer = OutgoingFrameBuffer::new();
let max_packet_size = 50;
frame_buffer.set_max_packet_size(Some(max_packet_size));
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);

const DATA_SIZE: usize = 2000;
let array: [u8; DATA_SIZE] = [1; DATA_SIZE];

// Set up each stream to have much more data to send than can fit in our test packet
for stream_id in &stream_ids {
manager
.with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| {
let data_to_send = bytes::Bytes::copy_from_slice(&array);
stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None)
})
.unwrap();
}
// make sure the order matches creation order
assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

// Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times.
// Then the stream gets sent to the back of the transmission list.
for idx in 1..=40 {
let _ = manager.on_transmit(&mut write_context);
write_context.frame_buffer.flush();

if idx % batch_size == 0 {
// The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets
stream_ids.rotate_left(1);
}

assert_eq!(stream_ids, manager.streams_waiting_for_transmission());
}
}
123 changes: 101 additions & 22 deletions quic/s2n-quic-transport/src/stream/stream_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#![allow(unknown_lints, clippy::non_send_fields_in_send_ty)]

use crate::{
stream,
stream::{stream_impl::StreamTrait, stream_interests::StreamInterests},
connection,
stream::{self, stream_impl::StreamTrait, stream_interests::StreamInterests},
transmission,
};
use alloc::rc::Rc;
Expand Down Expand Up @@ -145,10 +145,17 @@ struct InterestLists<S> {
/// stream flow control window to increase
waiting_for_stream_flow_control_credits:
LinkedList<WaitingForStreamFlowControlCreditsAdapter<S>>,
transmission_counter: u8,
transmission_limit: u8,
Comment on lines +148 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a better place for these, since this struct is containing only lists

}

struct Outcome {
done: bool,
push_back_remaining: bool,
}

impl<S: StreamTrait> InterestLists<S> {
fn new() -> Self {
fn new(connection_limits: &connection::Limits) -> Self {
Self {
done_streams: LinkedList::new(DoneStreamsAdapter::new()),
waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()),
Expand All @@ -160,6 +167,8 @@ impl<S: StreamTrait> InterestLists<S> {
waiting_for_stream_flow_control_credits: LinkedList::new(
WaitingForStreamFlowControlCreditsAdapter::new(),
),
transmission_counter: 0,
transmission_limit: connection_limits.stream_burst_size(),
}
}

Expand All @@ -169,19 +178,77 @@ impl<S: StreamTrait> InterestLists<S> {
node: &Rc<StreamNode<S>>,
interests: StreamInterests,
result: StreamContainerIterationResult,
) -> bool {
) -> Outcome {
// Note that all comparisons start by checking whether the stream is
// already part of the given list. This is required in order for the
// following operation to be safe. Inserting an element in a list while
// it is already part of a (different) list can panic. Trying to remove
// an element from a list while it is not actually part of the list
// is undefined.
let mut outcome = Outcome {
done: false,
push_back_remaining: true,
};

macro_rules! sync_trans_interests {
($interest:expr, $link_name:ident, $list_name:ident) => {
if $interest != node.$link_name.is_linked() {
if $interest {
let stream_credits = self.transmission_counter < self.transmission_limit;
if matches!(result, StreamContainerIterationResult::UseStreamCredit)
{
self.$list_name.push_back(node.clone());
if stream_credits {
self.transmission_counter += 1;
} else {
self.transmission_counter = 0;
}
} else if matches!(
result,
StreamContainerIterationResult::BreakAndInsertAtBack
){
if stream_credits {
// In this case the node at the front of the list still has sending credits.
// Therefore we want to preserve the order of the list by pushing each
// node back.
self.$list_name.push_back(node.clone());
outcome.push_back_remaining = true;
} else {
// In this case the node at the front of the list has no more sending
// credits. Therefore we push the current node to the front of the list
// and push the remaining nodes behind the current node, which effectively
// moves the front node to the back of the list.
self.$list_name.push_front(node.clone());
self.transmission_counter = 0;
outcome.push_back_remaining = false;
}
} else if matches!(result, StreamContainerIterationResult::Continue) {
// In this case we are simply iterating through the list and don't
// need to increment credits
self.$list_name.push_back(node.clone());
}
} else {
// Safety: We know that the node is only ever part of this list.
// While elements are in temporary lists, they always get unlinked
// from those temporary lists while their interest is updated.
let mut cursor = unsafe {
self.$list_name
.cursor_mut_from_ptr(node.deref() as *const StreamNode<S>)
};
cursor.remove();
}
}
debug_assert_eq!($interest, node.$link_name.is_linked());
};
}

macro_rules! sync_interests {
($interest:expr, $link_name:ident, $list_name:ident) => {
if $interest != node.$link_name.is_linked() {
if $interest {
if matches!(result, StreamContainerIterationResult::Continue) {
if matches!(result, StreamContainerIterationResult::Continue)
|| matches!(result, StreamContainerIterationResult::UseStreamCredit)
{
self.$list_name.push_back(node.clone());
} else {
self.$list_name.push_front(node.clone());
Expand All @@ -206,7 +273,7 @@ impl<S: StreamTrait> InterestLists<S> {
waiting_for_frame_delivery_link,
waiting_for_frame_delivery
);
sync_interests!(
sync_trans_interests!(
maddeleine marked this conversation as resolved.
Show resolved Hide resolved
matches!(interests.transmission, transmission::Interest::NewData),
waiting_for_transmission_link,
waiting_for_transmission
Expand All @@ -233,10 +300,11 @@ impl<S: StreamTrait> InterestLists<S> {
} else {
panic!("Done streams should never report not done later");
}
true
outcome.done = true;
} else {
false
outcome.done = false;
}
outcome
}
}

Expand Down Expand Up @@ -308,34 +376,43 @@ macro_rules! iterate_interruptible {

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
$sel.interest_lists
let outcome = $sel
.interest_lists
.update_interests(&stream, interests, result);

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
$sel.interest_lists
.$list_name
.front_mut()
.splice_after(extracted_list);
break;
if outcome.push_back_remaining {
$sel.interest_lists
.$list_name
.back_mut()
.splice_after(extracted_list);
break;
} else {
$sel.interest_lists
.$list_name
.front_mut()
.splice_after(extracted_list);
break;
}
}
StreamContainerIterationResult::Continue => {}
_ => {}
}
}

if !$sel.interest_lists.done_streams.is_empty() {
$sel.finalize_done_streams($controller);
if !$sel.interest_lists.done_streams.is_empty() {
$sel.finalize_done_streams($controller);
}
}
};
}

impl<S: StreamTrait> StreamContainer<S> {
/// Creates a new `StreamContainer`
pub fn new() -> Self {
pub fn new(connection_limits: &connection::Limits) -> Self {
Self {
stream_map: RBTree::new(StreamTreeAdapter::new()),
nr_active_streams: 0,
interest_lists: InterestLists::new(),
interest_lists: InterestLists::new(connection_limits),
}
}

Expand Down Expand Up @@ -414,11 +491,12 @@ impl<S: StreamTrait> StreamContainer<S> {

// Update the interest lists after the interactions and then remove
// all finalized streams
if self.interest_lists.update_interests(
let outcome = self.interest_lists.update_interests(
&node_ptr,
interests,
StreamContainerIterationResult::Continue,
) {
);
if outcome.done {
self.finalize_done_streams(controller);
}

Expand Down Expand Up @@ -659,6 +737,7 @@ impl<S: StreamTrait> transmission::interest::Provider for StreamContainer<S> {
pub enum StreamContainerIterationResult {
/// Continue iteration over the list
Continue,
UseStreamCredit,
/// Aborts the iteration over a list and add the remaining items at the
/// back of the list
BreakAndInsertAtBack,
Expand Down
Loading