Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-transport): Adds stream batching functionality #2433

Merged
merged 6 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30);
//# received.
pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3;

pub const DEFAULT_STREAM_BATCH_SIZE: u8 = 1;

#[non_exhaustive]
#[derive(Debug)]
pub struct ConnectionInfo<'a> {
Expand Down Expand Up @@ -74,6 +76,7 @@ pub struct Limits {
pub(crate) initial_round_trip_time: Duration,
pub(crate) migration_support: MigrationSupport,
pub(crate) anti_amplification_multiplier: u8,
pub(crate) stream_batch_size: u8,
}

impl Default for Limits {
Expand Down Expand Up @@ -120,6 +123,7 @@ impl Limits {
initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT,
migration_support: MigrationSupport::RECOMMENDED,
anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER,
stream_batch_size: DEFAULT_STREAM_BATCH_SIZE,
}
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Limits {
max_active_connection_ids,
u64
);
setter!(with_stream_batch_size, stream_batch_size, u8);
setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8);
setter!(with_max_ack_ranges, ack_ranges_limit, u8);
setter!(
Expand Down Expand Up @@ -384,6 +389,12 @@ impl Limits {
pub fn anti_amplification_multiplier(&self) -> u8 {
self.anti_amplification_multiplier
}

#[doc(hidden)]
#[inline]
pub fn stream_batch_size(&self) -> u8 {
self.stream_batch_size
}
}

/// Creates limits for a given connection
Expand Down
2 changes: 1 addition & 1 deletion quic/s2n-quic-core/src/packet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ pub trait PacketEncoder<K: CryptoKey, H: HeaderKey, Payload: PacketPayloadEncode
return Err(PacketEncodingError::EmptyPayload(buffer));
}

// Ideally we would check that the `paylod_len >= minimum_payload_len`. However, the packet
// Ideally we would check that the `payload_len >= minimum_payload_len`. However, the packet
// interceptor may rewrite the packet into something smaller. Instead of preventing that
// here, we will rely on the `crate::transmission::Transmission` logic to ensure the
// padding is initially written to ensure the minimum is met before interception is applied.
Expand Down
6 changes: 3 additions & 3 deletions quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
connection_limits.stream_limits(),
min_rtt,
),
streams: StreamContainer::new(),
streams: StreamContainer::new(connection_limits),
next_stream_ids: StreamIdSet::initial(),
local_endpoint_type,
initial_local_limits,
Expand Down Expand Up @@ -849,7 +849,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
transmission::context::RetransmissionContext::new(context);

// Prioritize retransmitting lost data
self.inner.streams.iterate_retransmission_list(
self.inner.streams.send_on_retransmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(&mut retransmission_context);
Expand All @@ -866,7 +866,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
}

if context.transmission_constraint().can_transmit() {
self.inner.streams.iterate_transmission_list(
self.inner.streams.send_on_transmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(context);
Expand Down
86 changes: 86 additions & 0 deletions quic/s2n-quic-transport/src/stream/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,11 @@ fn on_transmit_queries_streams_for_data() {
endpoint::Type::Server,
);

assert_eq!(
[stream_1, stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);

assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
Expand Down Expand Up @@ -3249,3 +3254,84 @@ fn stream_transmission_fairness_test() {
}
}
}

#[test]
fn stream_batching_test() {
for batch_size in 1..=10 {
dbg!(batch_size);
let limits = ConnectionLimits::default()
.with_stream_batch_size(batch_size)
.unwrap();

let mut manager = AbstractStreamManager::<stream::StreamImpl>::new(
&limits,
endpoint::Type::Server,
create_default_initial_flow_control_limits(),
create_default_initial_flow_control_limits(),
DEFAULT_INITIAL_RTT,
);

// Create some open Streams
let mut stream_ids: VecDeque<StreamId> = (0..4)
.map(|_| {
let (accept_waker, _accept_wake_counter) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();

let result = match manager.poll_open_local_stream(
StreamType::Bidirectional,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&accept_waker),
) {
Poll::Ready(res) => res,
Poll::Pending => Err(connection::Error::unspecified()),
};
result.unwrap()
})
.collect();

// Create a context that can only fit packets of size 50
let mut frame_buffer = OutgoingFrameBuffer::new();
let max_packet_size = 50;
frame_buffer.set_max_packet_size(Some(max_packet_size));
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);

const DATA_SIZE: usize = 2000;
let array: [u8; DATA_SIZE] = [1; DATA_SIZE];

// Set up each stream to have much more data to send than can fit in our test packet
for stream_id in &stream_ids {
manager
.with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| {
let data_to_send = bytes::Bytes::copy_from_slice(&array);
stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None)
})
.unwrap();
}
// make sure the order matches creation order
assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

// Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times.
// Then the stream gets sent to the back of the transmission list.
for idx in 1..=40 {
dbg!(idx);
let _ = manager.on_transmit(&mut write_context);

assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

write_context.frame_buffer.flush();

if idx % batch_size == 0 {
// The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets
stream_ids.rotate_left(1);
}
}
}
}
Loading
Loading