Skip to content

Commit

Permalink
Support zero-copy send (#123)
Browse files Browse the repository at this point in the history
Linux 6.0 introduced `IORING_OP_SEND_ZC`. This PR introduces the
function `send_zc` on udp (todo: tcp) sockets which takes advantage of
it.

As IORING_OP_SEND_ZC can return multiple CQE's per SQE, this also
extends the `Lifecycle` handling in `impl Fut for Op<T>` to check for
these occurences. If does this using a new method `update` in
`Completable`, which I added to minimise the scope of the changes - it
just falls back to the `complete` method for implementations which are
just single entry.

This approach could probably be used as a the basis of streaming
interfaces for some other submission types.
  • Loading branch information
ollie-etl authored Nov 7, 2022
1 parent 8ca72f3 commit 35c9495
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod rename_at;

mod send_to;

mod send_zc;

mod shared_fd;
pub(crate) use shared_fd::SharedFd;

Expand Down
83 changes: 83 additions & 0 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,22 @@ pub(crate) struct Op<T: 'static, CqeType = SingleCQE> {
/// A Marker for Ops which expect only a single completion event
pub(crate) struct SingleCQE;

/// A Marker for Operations will process multiple completion events,
/// which combined resolve to a single Future value
pub(crate) struct MultiCQEFuture;

pub(crate) trait Completable {
type Output;
/// `complete` will be called for cqe's do not have the `more` flag set
fn complete(self, cqe: CqeResult) -> Self::Output;
}

pub(crate) trait Updateable: Completable {
/// Update will be called for cqe's which have the `more` flag set.
/// The Op should update any internal state as required.
fn update(&mut self, cqe: CqeResult);
}

pub(crate) enum Lifecycle {
/// The operation has been submitted to uring and is currently in-flight
Submitted,
Expand Down Expand Up @@ -171,6 +182,78 @@ where
}
}

impl<T> Future for Op<T, MultiCQEFuture>
where
T: Unpin + 'static + Completable + Updateable,
{
type Output = T::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::mem;

let me = &mut *self;

CONTEXT.with(|runtime_context| {
runtime_context.with_driver_mut(|driver| {
let (lifecycle, completions) = driver
.ops
.get_mut(me.index)
.expect("invalid internal state");

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) => {
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Ignored(..) => unreachable!(),
Lifecycle::Completed(cqe) => {
// This is possible. We may have previously polled a CompletionList,
// and the final CQE registered as Completed
driver.ops.remove(me.index);
me.index = usize::MAX;
Poll::Ready(me.data.take().unwrap().complete(cqe))
}
Lifecycle::CompletionList(indices) => {
let mut data = me.data.take().unwrap();
let mut status = Poll::Pending;
// Consume the CqeResult list, calling update on the Op on all Cqe's flagged `more`
// If the final Cqe is present, clean up and return Poll::Ready
for cqe in indices.into_list(completions) {
if io_uring::cqueue::more(cqe.flags) {
data.update(cqe);
} else {
status = Poll::Ready(cqe);
break;
}
}
match status {
Poll::Pending => {
// We need more CQE's. Restore the op state
let _ = me.data.insert(data);
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Poll::Ready(cqe) => {
driver.ops.remove(me.index);
me.index = usize::MAX;
Poll::Ready(data.complete(cqe))
}
}
}
}
})
})
}
}

/// The operation may have pending cqe's not yet processed.
/// To manage this, the lifecycle associated with the Op may if required
/// be placed in LifeCycle::Ignored state to handle cqe's which arrive after
Expand Down
9 changes: 8 additions & 1 deletion src/driver/op/slab_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl<'a, T> SlabList<'a, T> {
}

/// Pop from front of list
#[allow(dead_code)]
pub(crate) fn pop(&mut self) -> Option<T> {
self.slab
.try_remove(self.index.start)
Expand Down Expand Up @@ -121,6 +120,14 @@ impl<'a, T> Drop for SlabList<'a, T> {
}
}

impl<'a, T> Iterator for SlabList<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.pop()
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
65 changes: 65 additions & 0 deletions src/driver/send_zc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::driver::op::{self, Completable, Updateable};
use crate::{
buf::IoBuf,
driver::{Op, SharedFd},
BufResult,
};
use std::io;

pub(crate) struct SendZc<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,

pub(crate) buf: T,

/// Hold the number of transmitted bytes
bytes: usize,
}

impl<T: IoBuf> Op<SendZc<T>> {
pub(crate) fn send_zc(fd: &SharedFd, buf: T) -> io::Result<Op<SendZc<T>>> {
use io_uring::{opcode, types};

Op::submit_with(
SendZc {
fd: fd.clone(),
buf,
bytes: 0,
},
|send| {
// Get raw buffer info
let ptr = send.buf.stable_ptr();
let len = send.buf.bytes_init();

opcode::SendZc::new(types::Fd(fd.raw_fd()), ptr, len as _).build()
},
)
}
}

impl<T> Completable for SendZc<T>
where
T: IoBuf,
{
type Output = BufResult<usize, T>;

fn complete(self, cqe: op::CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| self.bytes + v as usize);
// Recover the buffer
let buf = self.buf;
(res, buf)
}
}

impl<T> Updateable for SendZc<T>
where
T: IoBuf,
{
fn update(&mut self, cqe: op::CqeResult) {
// uring send_zc promises there will be no error on CQE's marked more
self.bytes += *cqe.result.as_ref().unwrap() as usize;
}
}
5 changes: 5 additions & 0 deletions src/driver/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ impl Socket {
op.await
}

pub(crate) async fn send_zc<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::send_zc(&self.fd, buf).unwrap();
op.await
}

pub(crate) async fn read<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::read_at(&self.fd, buf, 0).unwrap();
op.await
Expand Down
16 changes: 16 additions & 0 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ impl UdpSocket {
self.inner.send_to(buf, socket_addr).await
}

/// Sends data on the socket. Will attempt to do so without intermediate copies.
/// On success, returns the number of bytes written.
///
/// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
/// for a discussion on when this might be appropriate. In particular:
///
/// > Copy avoidance is not a free lunch. As implemented, with page pinning,
/// > it replaces per byte copy cost with page accounting and completion
/// > notification overhead. As a result, zero copy is generally only effective
/// > at writes over around 10 KB.
///
/// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead
pub async fn send_zc<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.send_zc(buf).await
}

/// Receives a single datagram message on the socket. On success, returns
/// the number of bytes read and the origin.
pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
Expand Down

0 comments on commit 35c9495

Please sign in to comment.