Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: refactor driver, runtime, and op #197

Merged
merged 5 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/buf/fixed/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,7 @@ impl FixedBufPool {
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
FixedBufPool {
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
driver: CONTEXT.with(|x| {
x.handle()
.as_ref()
.expect("Not in a runtime context")
.into()
}),
driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")),
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/buf/fixed/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,7 @@ impl FixedBufRegistry {
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
FixedBufRegistry {
inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))),
driver: CONTEXT.with(|x| {
x.handle()
.as_ref()
.expect("Not in a runtime context")
.into()
}),
driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")),
}
}

Expand Down
16 changes: 3 additions & 13 deletions src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::driver;
use crate::runtime::driver::Handle;
use crate::runtime::driver::{Handle, WeakHandle};
use std::cell::RefCell;

/// Owns the driver and resides in thread-local storage.
Expand Down Expand Up @@ -44,17 +44,7 @@ impl RuntimeContext {
self.driver.borrow().clone()
}

/// Execute a function which requires mutable access to the driver.
pub(crate) fn with_handle_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut driver::Handle) -> R,
{
let mut guard = self.driver.borrow_mut();

let driver = guard
.as_mut()
.expect("Attempted to access driver in invalid context");

f(driver)
pub(crate) fn weak(&self) -> Option<WeakHandle> {
self.driver.borrow().as_ref().map(Into::into)
}
}
192 changes: 17 additions & 175 deletions src/runtime/driver/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
//! The weak handle should be used by anything which is stored in the driver or does not need to
//! keep the driver alive for it's duration.

use io_uring::{cqueue, squeue};
use io_uring::squeue;
use std::cell::RefCell;
use std::io;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};

use crate::buf::fixed::FixedBuffers;
use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable};
use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable};
use crate::runtime::driver::Driver;

#[derive(Clone)]
Expand All @@ -40,8 +41,8 @@ impl Handle {
})
}

pub(crate) fn tick(&self) {
self.inner.borrow_mut().tick()
pub(crate) fn dispatch_completions(&self) {
self.inner.borrow_mut().dispatch_completions()
}

pub(crate) fn flush(&self) -> io::Result<usize> {
Expand All @@ -52,99 +53,29 @@ impl Handle {
&self,
buffers: Rc<RefCell<dyn FixedBuffers>>,
) -> io::Result<()> {
let mut driver = self.inner.borrow_mut();

driver
.uring
.submitter()
.register_buffers(buffers.borrow().iovecs())?;

driver.fixed_buffers = Some(buffers);
Ok(())
self.inner.borrow_mut().register_buffers(buffers)
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) fn unregister_buffers(
&self,
buffers: Rc<RefCell<dyn FixedBuffers>>,
) -> io::Result<()> {
let mut driver = self.inner.borrow_mut();

if let Some(currently_registered) = &driver.fixed_buffers {
if Rc::ptr_eq(&buffers, currently_registered) {
driver.uring.submitter().unregister_buffers()?;
driver.fixed_buffers = None;
return Ok(());
}
}
Err(io::Error::new(
io::ErrorKind::Other,
"fixed buffers are not currently registered",
))
self.inner.borrow_mut().unregister_buffers(buffers)
}

/// Submit an operation to uring.
///
/// `state` is stored during the operation tracking any state submitted to
/// the kernel.
pub(crate) fn submit_op<T, S, F>(&self, mut data: T, f: F) -> io::Result<Op<T, S>>
pub(crate) fn submit_op<T, S, F>(&self, data: T, f: F) -> io::Result<Op<T, S>>
where
T: Completable,
F: FnOnce(&mut T) -> squeue::Entry,
{
let mut driver = self.inner.borrow_mut();
let index = driver.ops.insert();

// Configure the SQE
let sqe = f(&mut data).user_data(index as _);

// Create the operation
let op = Op::new(self.into(), data, index);

// Push the new operation
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
// If the submission queue is full, flush it to the kernel
driver.submit()?;
}

Ok(op)
self.inner.borrow_mut().submit_op(data, f, self.into())
}

pub(crate) fn poll_op<T>(&self, op: &mut Op<T>, cx: &mut Context<'_>) -> Poll<T::Output>
where
T: Unpin + 'static + Completable,
{
use std::mem;

let mut driver = self.inner.borrow_mut();

let (lifecycle, _) = driver
.ops
.get_mut(op.index)
.expect("invalid internal state");

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) => {
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Ignored(..) => unreachable!(),
Lifecycle::Completed(cqe) => {
driver.ops.remove(op.index);
op.index = usize::MAX;
Poll::Ready(op.data.take().unwrap().complete(cqe))
}
Lifecycle::CompletionList(..) => {
unreachable!("No `more` flag set for SingleCQE")
}
}
self.inner.borrow_mut().poll_op(op, cx)
}

pub(crate) fn poll_multishot_op<T>(
Expand All @@ -155,103 +86,11 @@ impl Handle {
where
T: Unpin + 'static + Completable + Updateable,
{
use std::mem;

let mut driver = self.inner.borrow_mut();

let (lifecycle, completions) = driver
.ops
.get_mut(op.index)
.expect("invalid internal state");

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Waiting(waker) => {
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Ignored(..) => unreachable!(),
Lifecycle::Completed(cqe) => {
// This is possible. We may have previously polled a CompletionList,
// and the final CQE registered as Completed
driver.ops.remove(op.index);
op.index = usize::MAX;
Poll::Ready(op.data.take().unwrap().complete(cqe))
}
Lifecycle::CompletionList(indices) => {
let mut data = op.data.take().unwrap();
let mut status = Poll::Pending;
// Consume the CqeResult list, calling update on the Op on all Cqe's flagged `more`
// If the final Cqe is present, clean up and return Poll::Ready
for cqe in indices.into_list(completions) {
if cqueue::more(cqe.flags) {
data.update(cqe);
} else {
status = Poll::Ready(cqe);
break;
}
}
match status {
Poll::Pending => {
// We need more CQE's. Restore the op state
let _ = op.data.insert(data);
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Poll::Ready(cqe) => {
driver.ops.remove(op.index);
op.index = usize::MAX;
Poll::Ready(data.complete(cqe))
}
}
}
}
self.inner.borrow_mut().poll_multishot_op(op, cx)
}

pub(crate) fn remove_op<T, CqeType>(&self, op: &mut Op<T, CqeType>) {
use std::mem;

let mut driver = self.inner.borrow_mut();

// Get the Op Lifecycle state from the driver
let (lifecycle, completions) = match driver.ops.get_mut(op.index) {
Some(val) => val,
None => {
// Op dropped after the driver
return;
}
};

match mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
*lifecycle = Lifecycle::Ignored(Box::new(op.data.take()));
}
Lifecycle::Completed(..) => {
driver.ops.remove(op.index);
}
Lifecycle::CompletionList(indices) => {
// Deallocate list entries, recording if more CQE's are expected
let more = {
let mut list = indices.into_list(completions);
cqueue::more(list.peek_end().unwrap().flags)
// Dropping list deallocates the list entries
};
if more {
// If more are expected, we have to keep the op around
*lifecycle = Lifecycle::Ignored(Box::new(op.data.take()));
} else {
driver.ops.remove(op.index);
}
}
Lifecycle::Ignored(..) => unreachable!(),
}
self.inner.borrow_mut().remove_op(op)
}
}

Expand All @@ -277,8 +116,11 @@ impl From<Driver> for Handle {
}
}

impl From<&Handle> for WeakHandle {
fn from(handle: &Handle) -> Self {
impl<T> From<T> for WeakHandle
where
T: Deref<Target = Handle>,
{
fn from(handle: T) -> Self {
Self {
inner: Rc::downgrade(&handle.inner),
}
Expand Down
Loading