Skip to content

Commit

Permalink
net: add UdpStream sendmsg. (#263)
Browse files Browse the repository at this point in the history
Fixes #261
  • Loading branch information
Icelk authored Mar 28, 2023
1 parent 1a089de commit 47fbddf
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ mod send_to;

mod send_zc;

mod sendmsg;

mod sendmsg_zc;

mod shared_fd;
Expand Down
103 changes: 103 additions & 0 deletions src/io/sendmsg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::buf::BoundedBuf;
use crate::io::SharedFd;
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use socket2::SockAddr;
use std::io;
use std::io::IoSlice;
use std::net::SocketAddr;

pub(crate) struct SendMsg<T, U> {
_fd: SharedFd,
_io_bufs: Vec<T>,
_io_slices: Vec<IoSlice<'static>>,
_socket_addr: Option<Box<SockAddr>>,
msg_control: Option<U>,
msghdr: libc::msghdr,
}

impl<T: BoundedBuf, U: BoundedBuf> Op<SendMsg<T, U>> {
pub(crate) fn sendmsg(
fd: &SharedFd,
io_bufs: Vec<T>,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> io::Result<Self> {
use io_uring::{opcode, types};

let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };

let mut io_slices: Vec<IoSlice<'static>> = Vec::with_capacity(io_bufs.len());

for io_buf in &io_bufs {
io_slices.push(IoSlice::new(unsafe {
std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init())
}))
}

msghdr.msg_iov = io_slices.as_ptr() as *mut _;
msghdr.msg_iovlen = io_slices.len() as _;

let socket_addr = match socket_addr {
Some(_socket_addr) => {
let socket_addr = Box::new(SockAddr::from(_socket_addr));
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();
Some(socket_addr)
}
None => {
msghdr.msg_name = std::ptr::null_mut();
msghdr.msg_namelen = 0;
None
}
};

match msg_control {
Some(ref _msg_control) => {
msghdr.msg_control = _msg_control.stable_ptr() as *mut _;
msghdr.msg_controllen = _msg_control.bytes_init();
}
None => {
msghdr.msg_control = std::ptr::null_mut();
msghdr.msg_controllen = 0_usize;
}
}

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
SendMsg {
_fd: fd.clone(),
_io_bufs: io_bufs,
_socket_addr: socket_addr,
_io_slices: io_slices,
msg_control,
msghdr,
},
|sendmsg| {
opcode::SendMsg::new(
types::Fd(sendmsg._fd.raw_fd()),
&sendmsg.msghdr as *const _,
)
.build()
},
)
})
}
}

impl<T, U> Completable for SendMsg<T, U> {
type Output = (io::Result<usize>, Vec<T>, Option<U>);

fn complete(self, cqe: CqeResult) -> (io::Result<usize>, Vec<T>, Option<U>) {
// Convert the operation result to `usize`
let res = cqe.result.map(|n| n as usize);

// Recover the data buffers.
let io_bufs = self._io_bufs;

// Recover the ancillary data buffer.
let msg_control = self.msg_control;

(res, io_bufs, msg_control)
}
}
10 changes: 10 additions & 0 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ impl Socket {
op.await
}

pub(crate) async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
&self,
io_slices: Vec<T>,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
let op = Op::sendmsg(&self.fd, io_slices, socket_addr, msg_control).unwrap();
op.await
}

pub(crate) async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
&self,
io_slices: Vec<T>,
Expand Down
20 changes: 20 additions & 0 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,26 @@ impl UdpSocket {
self.inner.send_zc(buf).await
}

/// Sends a message on the socket using a msghdr.
///
/// Returns a tuple of:
///
/// * Result containing bytes written on success
/// * The original `io_slices` `Vec<T>`
/// * The original `msg_contol` `Option<U>`
///
/// Consider using [`Self::sendmsg_zc`] for a zero-copy alternative.
pub async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
&self,
io_slices: Vec<T>,
socket_addr: Option<SocketAddr>,
msg_control: Option<U>,
) -> (io::Result<usize>, Vec<T>, Option<U>) {
self.inner
.sendmsg(io_slices, socket_addr, msg_control)
.await
}

/// Sends a message on the socket using a msghdr.
///
/// Returns a tuple of:
Expand Down

0 comments on commit 47fbddf

Please sign in to comment.