Skip to content

Commit

Permalink
recovery: pacing of egress QUIC packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohith Bellad authored and ghedo committed May 24, 2021
1 parent 6d070ed commit fe2e217
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 1 deletion.
4 changes: 4 additions & 0 deletions include/quiche.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len,
const quiche_recv_info *info);

typedef struct {
// The address the packet should be sent to.
struct sockaddr_storage to;
socklen_t to_len;

// The time to send the packet out.
struct timespec time;
} quiche_send_info;

// Writes a single QUIC packet to be sent to the peer.
Expand Down
19 changes: 19 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use libc::c_void;
use libc::size_t;
use libc::sockaddr;
use libc::ssize_t;
use libc::timespec;

#[cfg(not(windows))]
use libc::sockaddr_in;
Expand Down Expand Up @@ -634,6 +635,8 @@ pub extern fn quiche_conn_recv(
pub struct SendInfo {
to: sockaddr_storage,
to_len: socklen_t,

time: timespec,
}

#[no_mangle]
Expand All @@ -650,6 +653,8 @@ pub extern fn quiche_conn_send(
Ok((v, info)) => {
out_info.to_len = std_addr_to_c(&info.to, &mut out_info.to);

std_time_to_c(&info.time, &mut out_info.time);

v as ssize_t
},

Expand Down Expand Up @@ -1092,3 +1097,17 @@ fn std_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t {
}
}
}

#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "windows")))]
fn std_time_to_c(time: &std::time::Instant, out: &mut timespec) {
unsafe {
ptr::copy_nonoverlapping(time as *const _ as *const timespec, out, 1)
}
}

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "windows"))]
fn std_time_to_c(time: &std::time::Instant, out: &mut timespec) {
// TODO: implement Instant conversion for systems that don't use timespec.
out.tv_sec = 0;
out.tv_nsec = 0;
}
12 changes: 11 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ pub struct RecvInfo {
pub struct SendInfo {
/// The address the packet should be sent to.
pub to: SocketAddr,

/// The time to send the packet out.
pub time: time::Instant,
}

/// Represents information carried by `CONNECTION_CLOSE` frames.
Expand Down Expand Up @@ -2390,7 +2393,14 @@ impl Connection {
done += pad_len;
}

let info = SendInfo { to: self.peer_addr };
let info = SendInfo {
to: self.peer_addr,

time: self
.recovery
.get_packet_send_time()
.unwrap_or_else(time::Instant::now),
};

Ok((done, info))
}
Expand Down
5 changes: 5 additions & 0 deletions src/recovery/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps {
collapse_cwnd,
checkpoint,
rollback,
has_custom_pacing,
};

/// CUBIC Constants.
Expand Down Expand Up @@ -404,6 +405,10 @@ fn rollback(r: &mut Recovery) {
r.congestion_recovery_start_time = r.cubic_state.prior.epoch_start;
}

fn has_custom_pacing() -> bool {
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
208 changes: 208 additions & 0 deletions src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub struct Recovery {

bytes_acked_ca: usize,

bytes_sent: usize,

congestion_recovery_start_time: Option<Instant>,

max_datagram_size: usize,
Expand All @@ -129,6 +131,11 @@ pub struct Recovery {

// HyStart++.
hystart: hystart::Hystart,

// Pacing.
pacing_rate: u64,

last_packet_scheduled_time: Option<Instant>,
}

impl Recovery {
Expand Down Expand Up @@ -185,6 +192,8 @@ impl Recovery {

bytes_acked_ca: 0,

bytes_sent: 0,

congestion_recovery_start_time: None,

max_datagram_size: config.max_send_udp_payload_size,
Expand All @@ -198,6 +207,10 @@ impl Recovery {
app_limited: false,

hystart: hystart::Hystart::new(config.hystart),

pacing_rate: 0,

last_packet_scheduled_time: None,
}
}

Expand Down Expand Up @@ -240,13 +253,64 @@ impl Recovery {
self.hystart.start_round(pkt_num);
}

// Pacing: Set the pacing rate if CC doesn't do its own.
if !(self.cc_ops.has_custom_pacing)() {
if let Some(srtt) = self.smoothed_rtt {
let rate = (self.congestion_window as u64 * 1000000) /
srtt.as_micros() as u64;
self.set_pacing_rate(rate);
}
}

self.schedule_next_packet(epoch, now, sent_bytes);

self.bytes_sent += sent_bytes;
trace!("{} {:?}", trace_id, self);
}

fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}

pub fn set_pacing_rate(&mut self, rate: u64) {
if rate != 0 {
self.pacing_rate = rate;
}
}

pub fn get_packet_send_time(&self) -> Option<Instant> {
self.last_packet_scheduled_time
}

fn schedule_next_packet(
&mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
) {
// Don't pace in any of these cases:
// * Packet epoch is not EPOCH_APPLICATION.
// * Packet contains only ACK frames.
// * The start of the connection.
if epoch != packet::EPOCH_APPLICATION ||
packet_size == 0 ||
self.bytes_sent <= self.congestion_window ||
self.pacing_rate == 0
{
self.last_packet_scheduled_time = Some(now);
return;
}

self.last_packet_scheduled_time = match self.last_packet_scheduled_time {
Some(last_scheduled_time) => {
let interval: u64 =
(packet_size as u64 * 1000000) / self.pacing_rate;
let interval = Duration::from_micros(interval);
let next_schedule_time = last_scheduled_time + interval;
Some(cmp::max(now, next_schedule_time))
},

None => Some(now),
};
}

pub fn on_ack_received(
&mut self, ranges: &ranges::RangeSet, ack_delay: u64,
epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
Expand Down Expand Up @@ -847,6 +911,8 @@ pub struct CongestionControlOps {
pub checkpoint: fn(r: &mut Recovery),

pub rollback: fn(r: &mut Recovery),

pub has_custom_pacing: fn() -> bool,
}

impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
Expand Down Expand Up @@ -893,6 +959,12 @@ impl std::fmt::Debug for Recovery {
self.congestion_recovery_start_time
)?;
write!(f, "{:?} ", self.delivery_rate)?;
write!(f, "pacing_rate={:?}", self.pacing_rate)?;
write!(
f,
"last_packet_scheduled_time={:?}",
self.last_packet_scheduled_time
)?;

if self.hystart.enabled() {
write!(f, "hystart={:?} ", self.hystart)?;
Expand Down Expand Up @@ -1556,6 +1628,142 @@ mod tests {
// Spurious loss.
assert_eq!(r.lost_count, 1);
}

#[test]
fn pacing() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);

let mut r = Recovery::new(&cfg);

let mut now = Instant::now();

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);

// send out first packet.
let p = Sent {
pkt_num: 0,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
assert_eq!(r.bytes_in_flight, 6500);

// First packet will be sent out immidiately.
assert_eq!(r.pacing_rate, 0);
assert_eq!(r.get_packet_send_time().unwrap(), now);

// Wait 50ms for ACK.
now += Duration::from_millis(50);

let mut acked = ranges::RangeSet::default();
acked.insert(0..1);

assert_eq!(
r.on_ack_received(
&acked,
10,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
""
),
Ok(())
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));

// Send out second packet.
let p = Sent {
pkt_num: 1,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
assert_eq!(r.bytes_in_flight, 6500);

// Pacing is not done during intial phase of connection.
assert_eq!(r.get_packet_send_time().unwrap(), now);

// Send the third packet out.
let p = Sent {
pkt_num: 2,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
assert_eq!(r.bytes_in_flight, 13000);
assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));

// We pace this outgoing packet. as all conditions for pacing
// are passed.
assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64);
assert_eq!(
r.get_packet_send_time().unwrap(),
now + Duration::from_micros(
(6500 * 1000000) / (12000.0 / 0.05) as u64
)
);
}
}

mod cubic;
Expand Down
5 changes: 5 additions & 0 deletions src/recovery/reno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps {
collapse_cwnd,
checkpoint,
rollback,
has_custom_pacing,
};

pub fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) {
Expand Down Expand Up @@ -162,6 +163,10 @@ fn checkpoint(_r: &mut Recovery) {}

fn rollback(_r: &mut Recovery) {}

fn has_custom_pacing() -> bool {
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit fe2e217

Please sign in to comment.