From d53b9e593f5675da5599888d75e84d8aa60b18dc Mon Sep 17 00:00:00 2001 From: Lohith Bellad Date: Mon, 19 Oct 2020 12:15:10 -0700 Subject: [PATCH] Pacing: Pacing of egress QUIC packets --- include/quiche.h | 5 +++ src/ffi.rs | 65 ++++++++++++++++++++++++++++++++++ src/lib.rs | 76 ++++++++++++++++++++++++++++++++++++++++ src/recovery/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+) diff --git a/include/quiche.h b/include/quiche.h index d638ebb9fb..f7690ba16e 100644 --- a/include/quiche.h +++ b/include/quiche.h @@ -254,6 +254,11 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len); // Writes a single QUIC packet to be sent to the peer. ssize_t quiche_conn_send(quiche_conn *conn, uint8_t *out, size_t out_len); +// Writes a single QUIC packet to be sent to the peer and fills in the send_time +// of the packet. +ssize_t quiche_conn_send_at(quiche_conn *conn, uint8_t *out, size_t out_len, + struct timespec *send_time); + // Buffer holding data at a specific offset. typedef struct RangeBuf quiche_rangebuf; diff --git a/src/ffi.rs b/src/ffi.rs index 64f288430e..63b2a4af7e 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -31,6 +31,7 @@ use std::sync::atomic; #[cfg(unix)] use std::os::unix::io::FromRawFd; +use libc::timespec; use libc::c_char; use libc::c_int; @@ -241,6 +242,11 @@ pub extern fn quiche_config_enable_hystart(config: &mut Config, v: bool) { config.enable_hystart(v); } +#[no_mangle] +pub extern fn quiche_config_enable_pacing(config: &mut Config, v: bool) { + config.enable_pacing(v); +} + #[no_mangle] pub extern fn quiche_config_enable_dgram( config: &mut Config, enabled: bool, recv_queue_len: size_t, @@ -543,6 +549,32 @@ pub extern fn quiche_conn_send( } } +#[no_mangle] +#[cfg(unix)] +pub extern fn quiche_conn_send_at( + conn: &mut Connection, out: *mut u8, out_len: size_t, + send_time: &mut timespec, +) -> ssize_t { + if out_len > ::max_value() as usize { + panic!("The provided buffer is too large"); + } + + let out = unsafe { slice::from_raw_parts_mut(out, out_len) }; + let mut schedule_time = time::Instant::now(); + + let written = match conn.send_at(out, &mut schedule_time) { + Ok(v) => v as ssize_t, + + Err(e) => e.to_c(), + }; + + if written > 0 { + *send_time = instant_to_timespec(schedule_time); + } + + written +} + #[no_mangle] pub extern fn quiche_conn_stream_recv( conn: &mut Connection, stream_id: u64, out: *mut u8, out_len: size_t, @@ -841,3 +873,36 @@ pub extern fn quiche_conn_dgram_purge_outgoing( pub extern fn quiche_conn_free(conn: *mut Connection) { unsafe { Box::from_raw(conn) }; } + +#[cfg(unix)] +fn instant_to_timespec(instant: std::time::Instant) -> timespec { + use std::hash::{ + Hash, + Hasher, + }; + + struct ToTimeSpec(Vec); + impl Hasher for ToTimeSpec { + fn finish(&self) -> u64 { + unimplemented!(); + } + + fn write(&mut self, _: &[u8]) { + unimplemented!(); + } + + fn write_u64(&mut self, i: u64) { + self.0.push(i as _); + } + + fn write_u32(&mut self, i: u32) { + self.0.push(i as _); + } + } + let mut hasher = ToTimeSpec(vec![]); + instant.hash(&mut hasher); + libc::timespec { + tv_sec: hasher.0[0] as _, + tv_nsec: hasher.0[1] as _, + } +} diff --git a/src/lib.rs b/src/lib.rs index 1133e09d90..dc7ed81ed9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -450,6 +450,8 @@ pub struct Config { hystart: bool, + pacing: bool, + dgram_recv_max_queue_len: usize, dgram_send_max_queue_len: usize, @@ -476,6 +478,7 @@ impl Config { grease: true, cc_algorithm: CongestionControlAlgorithm::CUBIC, hystart: true, + pacing: true, dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN, dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN, @@ -804,6 +807,13 @@ impl Config { self.dgram_recv_max_queue_len = recv_queue_len; self.dgram_send_max_queue_len = send_queue_len; } + + /// Configures whether to enable Packet Pacing. + /// + /// The default is `true`. + pub fn enable_pacing(&mut self, v: bool) { + self.pacing = v; + } } /// A QUIC connection. @@ -2763,6 +2773,72 @@ impl Connection { Ok(written) } + /// Writes a single QUIC packet to be sent to the peer and fills in + /// send_time of the packet + /// + /// On success the number of bytes written to the output buffer is + /// returned, or [`Done`] if there was nothing to write. And send_time + /// is filled with time to send the packet out. + /// + /// The application should call `send()` multiple times until [`Done`] is + /// returned, indicating that there are no more packets to send. The time + /// field send_time should be discarded when error is returned. It is + /// recommended that `send()` be called in the following cases: + /// + /// * When the application receives QUIC packets from the peer (that is, + /// any time [`recv()`] is also called). + /// + /// * When the connection timer expires (that is, any time [`on_timeout()`] + /// is also called). + /// + /// * When the application sends data to the peer (for examples, any time + /// [`stream_send()`] or [`stream_shutdown()`] are called). + /// + /// [`Done`]: enum.Error.html#variant.Done + /// [`recv()`]: struct.Connection.html#method.recv + /// [`on_timeout()`]: struct.Connection.html#method.on_timeout + /// [`stream_send()`]: struct.Connection.html#method.stream_send + /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown + /// + /// ## Examples: + /// + /// ```no_run + /// # let mut out = [0; 512]; + /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?; + /// # let scid = [0xba; 16]; + /// # let mut conn = quiche::accept(&scid, None, &mut config)?; + /// # let mut send_time = std::time::Instant::now(); + /// loop { + /// let write = match conn.send_at(&mut out, &mut send_time) { + /// Ok(v) => v, + /// + /// Err(quiche::Error::Done) => { + /// // Done writing. + /// break; + /// }, + /// + /// Err(e) => { + /// // An error occurred, handle it. + /// break; + /// }, + /// }; + /// + /// // pass send_time to kernel using SO_TXTIME + /// socket.send(&out[..write]).unwrap(); + /// } + /// # Ok::<(), quiche::Error>(()) + /// ``` + pub fn send_at( + &mut self, out: &mut [u8], send_time: &mut time::Instant, + ) -> Result { + let written = self.send(out); + if written.is_ok() { + *send_time = self.recovery.get_packet_send_time().unwrap(); + } + written + } + // Returns the maximum size of a packet to be sent. // // This is a minimum of the sender's and the receiver's maximum UDP payload diff --git a/src/recovery/mod.rs b/src/recovery/mod.rs index de068ce48a..9feffc968c 100644 --- a/src/recovery/mod.rs +++ b/src/recovery/mod.rs @@ -108,6 +108,8 @@ pub struct Recovery { // Congestion control. cc_ops: &'static CongestionControlOps, + cc_algo: CongestionControlAlgorithm, + congestion_window: usize, bytes_in_flight: usize, @@ -116,6 +118,8 @@ pub struct Recovery { bytes_acked: usize, + bytes_sent: usize, + congestion_recovery_start_time: Option, max_datagram_size: usize, @@ -124,6 +128,13 @@ pub struct Recovery { // HyStart++. hystart: hystart::Hystart, + + // Pacing + pacing_enabled: bool, + + pacing_rate: usize, + + last_packet_scheduled_time: Option, } impl Recovery { @@ -178,12 +189,16 @@ impl Recovery { bytes_acked: 0, + bytes_sent: 0, + congestion_recovery_start_time: None, max_datagram_size: config.max_send_udp_payload_size, cc_ops: config.cc_algorithm.into(), + cc_algo: config.cc_algorithm, + delivery_rate: delivery_rate::Rate::default(), cubic_state: cubic::State::default(), @@ -191,6 +206,12 @@ impl Recovery { app_limited: false, hystart: hystart::Hystart::new(config.hystart), + + pacing_enabled: config.pacing, + + pacing_rate: 0, + + last_packet_scheduled_time: None, } } @@ -233,6 +254,22 @@ impl Recovery { self.hystart.start_round(pkt_num); } + // Pacing: Set the pacing rate if BBR is not used + if self.pacing_enabled && + self.cc_algo == CongestionControlAlgorithm::CUBIC + { + let rate = match self.smoothed_rtt { + Some(srtt) => + ((self.congestion_window as u128 * 1000000) / + srtt.as_micros()) as usize, + None => 0, + }; + self.set_pacing_rate(rate); + } + + self.schedule_next_packet(epoch, now, sent_bytes); + + self.bytes_sent += sent_bytes; trace!("{} {:?}", trace_id, self); } @@ -240,6 +277,48 @@ impl Recovery { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } + pub fn set_pacing_rate(&mut self, rate: usize) { + if self.pacing_enabled && rate > 0 { + self.pacing_rate = rate; + } + } + + pub fn get_packet_send_time(&self) -> Option { + self.last_packet_scheduled_time + } + + fn schedule_next_packet( + &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, + ) { + // Pacing is not done for following cases, + // 1. Packet epoch is not EPOCH_APPLICATION. + // 2. If packet has only ACK frames. + // 3. Start of the connection. + if !self.pacing_enabled || + epoch != packet::EPOCH_APPLICATION || + packet_size == 0 || + self.bytes_sent <= self.congestion_window || + self.pacing_rate == 0 + { + self.last_packet_scheduled_time = Some(now); + return; + } + + let interval = (packet_size * 1000000) / self.pacing_rate; + let next_send_interval = Duration::from_micros(interval as u64); + + let next_schedule_time = match self.last_packet_scheduled_time { + Some(last_scheduled_time) => last_scheduled_time + next_send_interval, + None => now, + }; + + self.last_packet_scheduled_time = if next_schedule_time <= now { + Some(now) + } else { + Some(next_schedule_time) + }; + } + pub fn on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, @@ -891,6 +970,12 @@ impl std::fmt::Debug for Recovery { self.congestion_recovery_start_time )?; write!(f, "{:?} ", self.delivery_rate)?; + write!(f, "pacing_rate={:?}", self.pacing_rate)?; + write!( + f, + "last_packet_scheduled_time={:?}", + self.last_packet_scheduled_time + )?; if self.hystart.enabled() { write!(f, "hystart={:?} ", self.hystart)?;