Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed buffers to support ReadFixed and WriteFixed ops #54

Merged
merged 35 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6512ef2
Groundwork for fixed buffers
mzabaluev Aug 26, 2021
01051c0
Add ReadFixed op
mzabaluev Sep 4, 2021
7e4279c
Add WriteFixed op
mzabaluev Sep 4, 2021
798e97a
Test for File::read_fixed_at
mzabaluev Sep 5, 2021
499b2da
Test for File::write_fixed_at
mzabaluev Sep 5, 2021
93c1e5d
Add method BufRegistry::unregister
mzabaluev Sep 5, 2021
e28f39a
Test graceful invalidation of fixed buffers
mzabaluev Sep 5, 2021
d09288d
Test turnaround of a checked-out FixedBuf
mzabaluev Sep 5, 2021
2c95fab
No need for mutable self in BufRegistry::check_out
mzabaluev Sep 6, 2021
b501841
API docs for fixed buffers
mzabaluev Sep 6, 2021
6ee9261
driver::register: small code cleanup
mzabaluev Sep 10, 2021
4a3a658
Hide fn main boilerplate in doc examples
mzabaluev Sep 12, 2021
6fb48bb
Remove a comment on behavior of unregister
mzabaluev Oct 28, 2022
e2ef5ca
Rename driver::register to driver::buffers
mzabaluev Oct 28, 2022
8c0138d
Fix clippy lints in driver::buffers
mzabaluev Oct 28, 2022
b3b309b
Merge branch 'master' into fixed-buffers
mzabaluev Nov 2, 2022
6547e59
Reorg and rename fixed buffers API
mzabaluev Nov 2, 2022
1c2828f
Merge branch 'master' into fixed-buffers
mzabaluev Nov 4, 2022
71f812d
Correct uses of FixedBufRegistry in doctests
mzabaluev Nov 4, 2022
b2c3cad
Assert the buffer is unchanged on FixedBuf drop
mzabaluev Nov 4, 2022
7aceefc
buf::fixed::buffers: Add comments, renaming
mzabaluev Nov 4, 2022
cf801ae
Fix a doc link
mzabaluev Nov 5, 2022
b9d041c
Improve doc on FixedBufRegistry::check_out
mzabaluev Nov 5, 2022
4a76ff7
Explain fixed_buffers ref in Driver
mzabaluev Nov 5, 2022
f9b5833
Improve the doc on FixedBufRegistry
mzabaluev Nov 5, 2022
3e8d4bf
Edit error on FixedBufRegistry::unregister_buffer
mzabaluev Nov 5, 2022
04f95bc
read_fixed and write_fixed for sockets
mzabaluev Nov 5, 2022
65e9e16
Merge branch 'master' into fixed-buffers
mzabaluev Nov 5, 2022
1391c50
Merge branch 'master' into fixed-buffers
mzabaluev Nov 21, 2022
365f478
buf: impl Deref/DerefMut for FixedBuf
mzabaluev Nov 8, 2022
a7c97d3
adapt fixed buffers to slice reform
mzabaluev Nov 8, 2022
f0443d7
tests: reading and writing with Slice<FixedBuf>
mzabaluev Nov 21, 2022
4747a30
Merge branch 'master' into fixed-buffers
mzabaluev Nov 22, 2022
86754bb
tests/fixed_buf: add comments in slicing test
mzabaluev Nov 22, 2022
e207e72
buf: implement Debug for FixedBuf
mzabaluev Nov 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions src/buf/fixed/buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use libc::{iovec, UIO_MAXIOV};
use std::cmp;
use std::mem;
use std::ptr;
use std::slice;

// Internal state shared by FixedBufRegistry and FixedBuf handles.
pub(crate) struct FixedBuffers {
// Pointer to an allocated array of iovec records referencing
// the allocated buffers. The number of initialized records is the
// same as the length of the states array.
raw_bufs: ptr::NonNull<iovec>,
// State information on the buffers. Indices in this array correspond to
// the indices in the array at raw_bufs.
states: Vec<BufState>,
// Original capacity of raw_bufs as a Vec.
orig_cap: usize,
}

// State information of a buffer in the registry,
enum BufState {
// The buffer is not in use.
// The field records the length of the initialized part.
Free { init_len: usize },
// The buffer is checked out.
// Its data are logically owned by the FixedBuf handle,
// which also keeps track of the length of the initialized part.
CheckedOut,
}

impl FixedBuffers {
pub(crate) fn new(bufs: impl Iterator<Item = Vec<u8>>) -> Self {
let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, 65_536));
let (size_hint, _) = bufs.size_hint();
let mut iovecs = Vec::with_capacity(size_hint);
let mut states = Vec::with_capacity(size_hint);
for mut buf in bufs {
iovecs.push(iovec {
iov_base: buf.as_mut_ptr() as *mut _,
iov_len: buf.capacity(),
});
states.push(BufState::Free {
init_len: buf.len(),
});
mem::forget(buf);
}
debug_assert_eq!(iovecs.len(), states.len());
// Safety: Vec::as_mut_ptr never returns null
let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) };
let orig_cap = iovecs.capacity();
mem::forget(iovecs);
FixedBuffers {
raw_bufs,
states,
orig_cap,
}
}

// If the indexed buffer is free, changes its state to checked out and
// returns its data. If the buffer is already checked out, returns None.
pub(crate) fn check_out(&mut self, index: usize) -> Option<(iovec, usize)> {
let iovecs_ptr = self.raw_bufs;
self.states.get_mut(index).and_then(|state| match *state {
BufState::Free { init_len } => {
*state = BufState::CheckedOut;
// Safety: the allocated array under the pointer is valid
// for the lifetime of self, the index is inside the array
// as checked by Vec::get_mut above, called on the array of
// states that has the same length.
let iovec = unsafe { iovecs_ptr.as_ptr().add(index).read() };
Some((iovec, init_len))
}
BufState::CheckedOut => None,
})
}

// Sets the indexed buffer's state to free and records the updated length
// of its initialized part. The buffer addressed must be in the checked out
// state, otherwise this function may panic.
pub(crate) fn check_in(&mut self, index: usize, init_len: usize) {
let state = self.states.get_mut(index).expect("invalid buffer index");
debug_assert!(
matches!(state, BufState::CheckedOut),
"the buffer must be checked out"
);
*state = BufState::Free { init_len };
}

pub(crate) fn iovecs(&self) -> &[iovec] {
// Safety: the raw_bufs pointer is valid for the lifetime of self,
// the slice length is valid by construction.
unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) }
}
}

impl Drop for FixedBuffers {
fn drop(&mut self) {
let iovecs = unsafe {
Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap)
};
for (i, iovec) in iovecs.iter().enumerate() {
match self.states[i] {
BufState::Free { init_len } => {
let ptr = iovec.iov_base as *mut u8;
let cap = iovec.iov_len;
let v = unsafe { Vec::from_raw_parts(ptr, init_len, cap) };
mem::drop(v);
}
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
}
}
}
}
100 changes: 100 additions & 0 deletions src/buf/fixed/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::FixedBuffers;
use crate::buf::{IoBuf, IoBufMut};

use libc::iovec;
use std::cell::RefCell;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;

/// A unique handle to a memory buffer that can be pre-registered with
/// the kernel for `io-uring` operations.
///
/// `FixedBuf` handles can be obtained from a [`FixedBufRegistry`] collection.
/// For each buffer, only a single `FixedBuf` handle can be either used by the
/// application code or owned by an I/O operation at any given time,
/// thus avoiding data races between `io-uring` operations in flight and
/// the application accessing buffer data.
///
/// [`FixedBufRegistry`]: super::FixedBufRegistry
///
pub struct FixedBuf {
registry: Rc<RefCell<FixedBuffers>>,
buf: ManuallyDrop<Vec<u8>>,
index: u16,
}

impl Drop for FixedBuf {
fn drop(&mut self) {
let mut registry = self.registry.borrow_mut();
debug_assert_eq!(
registry.iovecs()[self.index as usize].iov_base as *const u8,
self.buf.as_ptr()
);
debug_assert_eq!(
registry.iovecs()[self.index as usize].iov_len,
self.buf.capacity()
);
registry.check_in(self.index as usize, self.buf.len());
}
}

impl FixedBuf {
pub(super) unsafe fn new(
registry: Rc<RefCell<FixedBuffers>>,
iovec: iovec,
init_len: usize,
index: u16,
) -> Self {
let buf = Vec::from_raw_parts(iovec.iov_base as _, init_len, iovec.iov_len);
FixedBuf {
registry,
buf: ManuallyDrop::new(buf),
index,
}
}

pub(crate) fn buf_index(&self) -> u16 {
self.index
}
}

unsafe impl IoBuf for FixedBuf {
fn stable_ptr(&self) -> *const u8 {
self.buf.as_ptr()
}

fn bytes_init(&self) -> usize {
self.buf.len()
}

fn bytes_total(&self) -> usize {
self.buf.capacity()
}
}

unsafe impl IoBufMut for FixedBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.buf.as_mut_ptr()
}

unsafe fn set_init(&mut self, pos: usize) {
if self.buf.len() < pos {
self.buf.set_len(pos)
}
}
}

impl Deref for FixedBuf {
type Target = [u8];

fn deref(&self) -> &[u8] {
&self.buf
}
}

impl DerefMut for FixedBuf {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.buf
}
}
21 changes: 21 additions & 0 deletions src/buf/fixed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Buffers pre-registered with the kernel.
//!
//! This module provides facilities for registering in-memory buffers with
//! the `tokio-uring` runtime. Operations like [`File::read_fixed_at`][rfa] and
//! [`File::write_fixed_at`][wfa] make use of buffers pre-mapped by
//! the kernel to reduce per-I/O overhead.
//! The [`FixedBufRegistry::register`] method is used to register a collection of
//! buffers with the kernel; it must be called before any of the [`FixedBuf`]
//! handles to the collection's buffers can be used with I/O operations.
//!
//! [rfa]: crate::fs::File::read_fixed_at
//! [wfa]: crate::fs::File::write_fixed_at

mod buffers;
pub(crate) use self::buffers::FixedBuffers;

mod handle;
pub use handle::FixedBuf;

mod registry;
pub use registry::FixedBufRegistry;
115 changes: 115 additions & 0 deletions src/buf/fixed/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use super::{buffers::FixedBuffers, FixedBuf};
use crate::driver;

use std::cell::RefCell;
use std::io;
use std::rc::Rc;

/// An indexed collection of I/O buffers pre-registered with the kernel.
///
/// `FixedBufRegistry` allows the application to manage a collection of buffers
/// allocated in memory, that can be registered in the current `tokio-uring`
/// context using the [`register`] method.
///
/// A `FixedBufRegistry` value is a lightweight handle for a collection of
/// allocated buffers. Cloning of a `FixedBufRegistry` creates a new reference to
/// the same collection of buffers.
///
/// The buffers of the collection are not deallocated until:
/// - all `FixedBufRegistry` references to the collection have been dropped;
/// - all [`FixedBuf`] handles to individual buffers in the collection have
/// been dropped, including the buffer handles owned by any I/O operations
/// in flight;
/// - The `tokio-uring` [`Runtime`] the buffers are registered with
/// has been dropped.
///
/// [`register`]: Self::register
/// [`Runtime`]: crate::Runtime
#[derive(Clone)]
pub struct FixedBufRegistry {
inner: Rc<RefCell<FixedBuffers>>,
}

impl FixedBufRegistry {
/// Creates a new collection of buffers from the provided allocated vectors.
///
/// The buffers are assigned 0-based indices in the order of the iterable
/// input parameter. The returned collection takes up to [`UIO_MAXIOV`]
/// buffers from the input. Any items in excess of that amount are silently
/// dropped, unless the input iterator produces the vectors lazily.
///
/// [`UIO_MAXIOV`]: libc::UIO_MAXIOV
///
/// # Examples
///
/// ```
/// use tokio_uring::buf::fixed::FixedBufRegistry;
/// use std::iter;
///
/// let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(10));
/// ```
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
FixedBufRegistry {
inner: Rc::new(RefCell::new(FixedBuffers::new(bufs.into_iter()))),
}
}

/// Registers the buffers with the kernel.
///
/// This method must be called in the context of a `tokio-uring` runtime.
/// The registration persists for the lifetime of the runtime, unless
/// revoked by the [`unregister`] method. Dropping the
/// `FixedBufRegistry` instance this method has been called on does not revoke
/// the registration or deallocate the buffers.
///
/// [`unregister`]: Self::unregister
///
/// This call can be blocked in the kernel to complete any operations
/// in-flight on the same `io-uring` instance. The application is
/// recommended to register buffers before starting any I/O operations.
///
/// # Errors
///
/// If a collection of buffers is currently registered in the context
/// of the `tokio-uring` runtime this call is made in, the function returns
/// an error.
pub fn register(&self) -> io::Result<()> {
driver::register_buffers(&self.inner)
}

/// Unregisters this collection of buffers.
///
/// This method must be called in the context of a `tokio-uring` runtime,
/// where the buffers should have been previously registered.
///
/// This operation invalidates any `FixedBuf` handles checked out from
/// this registry instance. Continued use of such handles in I/O
/// operations may result in an error.
///
/// # Errors
///
/// If another collection of buffers is currently registered in the context
/// of the `tokio-uring` runtime this call is made in, the function returns
/// an error. Calling `unregister` when no `FixedBufRegistry` is currently
/// registered on this runtime also returns an error.
pub fn unregister(&self) -> io::Result<()> {
driver::unregister_buffers(&self.inner)
}

/// Returns a buffer identified by the specified index for use by the
/// application, unless the buffer is already in use.
///
/// The buffer is released to be available again once the
/// returned `FixedBuf` handle has been dropped. An I/O operation
/// using the buffer takes ownership of it and returns it once completed,
/// preventing shared use of the buffer while the operation is in flight.
pub fn check_out(&self, index: usize) -> Option<FixedBuf> {
FrankReh marked this conversation as resolved.
Show resolved Hide resolved
let mut inner = self.inner.borrow_mut();
inner.check_out(index).map(|(iovec, init_len)| {
debug_assert!(index <= u16::MAX as usize);
// Safety: the validity of iovec and init_len is ensured by
// FixedBuffers::check_out
unsafe { FixedBuf::new(Rc::clone(&self.inner), iovec, init_len, index as u16) }
})
}
}
2 changes: 2 additions & 0 deletions src/buf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer
//! types that respect the `io-uring` contract.

pub mod fixed;

mod io_buf;
pub use io_buf::IoBuf;

Expand Down
Loading