Skip to content

Commit

Permalink
Add recvmsg implmentation (#252)
Browse files Browse the repository at this point in the history
Adds the missing `recvmsg` function to the udp socket.

the level of duplication required vs `recv_from` is uncomfortable, but
required due to the lack of specialization.
  • Loading branch information
ollie-etl authored Feb 27, 2023
1 parent d8ae38b commit f562364
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod readv;

mod recv_from;

mod recvmsg;

mod rename_at;

mod send_to;
Expand Down
97 changes: 97 additions & 0 deletions src/io/recvmsg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use crate::{buf::BoundedBufMut, io::SharedFd, BufResult};
use socket2::SockAddr;
use std::{
io::IoSliceMut,
{boxed::Box, io, net::SocketAddr},
};

pub(crate) struct RecvMsg<T> {
#[allow(dead_code)]
fd: SharedFd,
pub(crate) buf: Vec<T>,
#[allow(dead_code)]
io_slices: Vec<IoSliceMut<'static>>,
pub(crate) socket_addr: Box<SockAddr>,
pub(crate) msghdr: Box<libc::msghdr>,
}

impl<T: BoundedBufMut> Op<RecvMsg<T>> {
pub(crate) fn recvmsg(fd: &SharedFd, mut bufs: Vec<T>) -> io::Result<Op<RecvMsg<T>>> {
use io_uring::{opcode, types};

let mut io_slices = Vec::with_capacity(bufs.len());
for buf in &mut bufs {
io_slices.push(IoSliceMut::new(unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
}));
}

let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 });

let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
msghdr.msg_iov = io_slices.as_mut_ptr().cast();
msghdr.msg_iovlen = io_slices.len() as _;
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
msghdr.msg_namelen = socket_addr.len();

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
RecvMsg {
fd: fd.clone(),
buf: bufs,
io_slices,
socket_addr,
msghdr,
},
|recv_from| {
opcode::RecvMsg::new(
types::Fd(recv_from.fd.raw_fd()),
recv_from.msghdr.as_mut() as *mut _,
)
.build()
},
)
})
}
}

impl<T> Completable for RecvMsg<T>
where
T: BoundedBufMut,
{
type Output = BufResult<(usize, SocketAddr), Vec<T>>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffers
let mut bufs = self.buf;

let socket_addr = (*self.socket_addr).as_socket();

let res = res.map(|n| {
let socket_addr: SocketAddr = socket_addr.unwrap();

let mut bytes = n;
for buf in &mut bufs {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf.set_init(bytes);
}
let total = buf.bytes_total();
if bytes > total {
bytes -= total;
} else {
// In the current API bytes_init is a watermark,
// so remaining don't need zeroing.
break;
}
}
(n, socket_addr)
});

(res, bufs)
}
}
8 changes: 8 additions & 0 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ impl Socket {
op.await
}

pub(crate) async fn recvmsg<T: BoundedBufMut>(
&self,
buf: Vec<T>,
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
let op = Op::recvmsg(&self.fd, buf).unwrap();
op.await
}

pub(crate) async fn accept(&self) -> io::Result<(Socket, Option<SocketAddr>)> {
let op = Op::accept(&self.fd)?;
op.await
Expand Down
10 changes: 10 additions & 0 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ impl UdpSocket {
self.inner.recv_from(buf).await
}

/// Receives a single datagram message on the socket, into multiple buffers
///
/// On success, returns the number of bytes read and the origin.
pub async fn recvmsg<T: BoundedBufMut>(
&self,
buf: Vec<T>,
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
self.inner.recvmsg(buf).await
}

/// Reads a packet of data from the socket into the buffer.
///
/// Returns the original buffer and quantity of data read.
Expand Down

1 comment on commit f562364

@bohdantrotsenko
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: s/implmentation/implementation/

Please sign in to comment.