diff --git a/src/buf/fixed/pool.rs b/src/buf/fixed/pool.rs index ac131883..372cb85c 100644 --- a/src/buf/fixed/pool.rs +++ b/src/buf/fixed/pool.rs @@ -152,12 +152,7 @@ impl FixedBufPool { pub fn new(bufs: impl IntoIterator>) -> Self { FixedBufPool { inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))), - driver: CONTEXT.with(|x| { - x.handle() - .as_ref() - .expect("Not in a runtime context") - .into() - }), + driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")), } } diff --git a/src/buf/fixed/registry.rs b/src/buf/fixed/registry.rs index 94be17f2..2e380506 100644 --- a/src/buf/fixed/registry.rs +++ b/src/buf/fixed/registry.rs @@ -106,12 +106,7 @@ impl FixedBufRegistry { pub fn new(bufs: impl IntoIterator>) -> Self { FixedBufRegistry { inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))), - driver: CONTEXT.with(|x| { - x.handle() - .as_ref() - .expect("Not in a runtime context") - .into() - }), + driver: CONTEXT.with(|x| x.weak().expect("Not in a runtime context")), } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index c06c5adc..3baca06b 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -1,5 +1,5 @@ use crate::runtime::driver; -use crate::runtime::driver::Handle; +use crate::runtime::driver::{Handle, WeakHandle}; use std::cell::RefCell; /// Owns the driver and resides in thread-local storage. @@ -44,17 +44,7 @@ impl RuntimeContext { self.driver.borrow().clone() } - /// Execute a function which requires mutable access to the driver. - pub(crate) fn with_handle_mut(&self, f: F) -> R - where - F: FnOnce(&mut driver::Handle) -> R, - { - let mut guard = self.driver.borrow_mut(); - - let driver = guard - .as_mut() - .expect("Attempted to access driver in invalid context"); - - f(driver) + pub(crate) fn weak(&self) -> Option { + self.driver.borrow().as_ref().map(Into::into) } } diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index d58118e3..ab3dcc51 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -12,15 +12,16 @@ //! The weak handle should be used by anything which is stored in the driver or does not need to //! keep the driver alive for it's duration. -use io_uring::{cqueue, squeue}; +use io_uring::squeue; use std::cell::RefCell; use std::io; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::rc::{Rc, Weak}; use std::task::{Context, Poll}; use crate::buf::fixed::FixedBuffers; -use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable}; +use crate::runtime::driver::op::{Completable, MultiCQEFuture, Op, Updateable}; use crate::runtime::driver::Driver; #[derive(Clone)] @@ -40,8 +41,8 @@ impl Handle { }) } - pub(crate) fn tick(&self) { - self.inner.borrow_mut().tick() + pub(crate) fn dispatch_completions(&self) { + self.inner.borrow_mut().dispatch_completions() } pub(crate) fn flush(&self) -> io::Result { @@ -52,99 +53,29 @@ impl Handle { &self, buffers: Rc>, ) -> io::Result<()> { - let mut driver = self.inner.borrow_mut(); - - driver - .uring - .submitter() - .register_buffers(buffers.borrow().iovecs())?; - - driver.fixed_buffers = Some(buffers); - Ok(()) + self.inner.borrow_mut().register_buffers(buffers) } pub(crate) fn unregister_buffers( &self, buffers: Rc>, ) -> io::Result<()> { - let mut driver = self.inner.borrow_mut(); - - if let Some(currently_registered) = &driver.fixed_buffers { - if Rc::ptr_eq(&buffers, currently_registered) { - driver.uring.submitter().unregister_buffers()?; - driver.fixed_buffers = None; - return Ok(()); - } - } - Err(io::Error::new( - io::ErrorKind::Other, - "fixed buffers are not currently registered", - )) + self.inner.borrow_mut().unregister_buffers(buffers) } - /// Submit an operation to uring. - /// - /// `state` is stored during the operation tracking any state submitted to - /// the kernel. - pub(crate) fn submit_op(&self, mut data: T, f: F) -> io::Result> + pub(crate) fn submit_op(&self, data: T, f: F) -> io::Result> where T: Completable, F: FnOnce(&mut T) -> squeue::Entry, { - let mut driver = self.inner.borrow_mut(); - let index = driver.ops.insert(); - - // Configure the SQE - let sqe = f(&mut data).user_data(index as _); - - // Create the operation - let op = Op::new(self.into(), data, index); - - // Push the new operation - while unsafe { driver.uring.submission().push(&sqe).is_err() } { - // If the submission queue is full, flush it to the kernel - driver.submit()?; - } - - Ok(op) + self.inner.borrow_mut().submit_op(data, f, self.into()) } pub(crate) fn poll_op(&self, op: &mut Op, cx: &mut Context<'_>) -> Poll where T: Unpin + 'static + Completable, { - use std::mem; - - let mut driver = self.inner.borrow_mut(); - - let (lifecycle, _) = driver - .ops - .get_mut(op.index) - .expect("invalid internal state"); - - match mem::replace(lifecycle, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) => { - *lifecycle = Lifecycle::Waiting(waker); - Poll::Pending - } - Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(cqe) => { - driver.ops.remove(op.index); - op.index = usize::MAX; - Poll::Ready(op.data.take().unwrap().complete(cqe)) - } - Lifecycle::CompletionList(..) => { - unreachable!("No `more` flag set for SingleCQE") - } - } + self.inner.borrow_mut().poll_op(op, cx) } pub(crate) fn poll_multishot_op( @@ -155,103 +86,11 @@ impl Handle { where T: Unpin + 'static + Completable + Updateable, { - use std::mem; - - let mut driver = self.inner.borrow_mut(); - - let (lifecycle, completions) = driver - .ops - .get_mut(op.index) - .expect("invalid internal state"); - - match mem::replace(lifecycle, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) => { - *lifecycle = Lifecycle::Waiting(waker); - Poll::Pending - } - Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(cqe) => { - // This is possible. We may have previously polled a CompletionList, - // and the final CQE registered as Completed - driver.ops.remove(op.index); - op.index = usize::MAX; - Poll::Ready(op.data.take().unwrap().complete(cqe)) - } - Lifecycle::CompletionList(indices) => { - let mut data = op.data.take().unwrap(); - let mut status = Poll::Pending; - // Consume the CqeResult list, calling update on the Op on all Cqe's flagged `more` - // If the final Cqe is present, clean up and return Poll::Ready - for cqe in indices.into_list(completions) { - if cqueue::more(cqe.flags) { - data.update(cqe); - } else { - status = Poll::Ready(cqe); - break; - } - } - match status { - Poll::Pending => { - // We need more CQE's. Restore the op state - let _ = op.data.insert(data); - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Poll::Ready(cqe) => { - driver.ops.remove(op.index); - op.index = usize::MAX; - Poll::Ready(data.complete(cqe)) - } - } - } - } + self.inner.borrow_mut().poll_multishot_op(op, cx) } pub(crate) fn remove_op(&self, op: &mut Op) { - use std::mem; - - let mut driver = self.inner.borrow_mut(); - - // Get the Op Lifecycle state from the driver - let (lifecycle, completions) = match driver.ops.get_mut(op.index) { - Some(val) => val, - None => { - // Op dropped after the driver - return; - } - }; - - match mem::replace(lifecycle, Lifecycle::Submitted) { - Lifecycle::Submitted | Lifecycle::Waiting(_) => { - *lifecycle = Lifecycle::Ignored(Box::new(op.data.take())); - } - Lifecycle::Completed(..) => { - driver.ops.remove(op.index); - } - Lifecycle::CompletionList(indices) => { - // Deallocate list entries, recording if more CQE's are expected - let more = { - let mut list = indices.into_list(completions); - cqueue::more(list.peek_end().unwrap().flags) - // Dropping list deallocates the list entries - }; - if more { - // If more are expected, we have to keep the op around - *lifecycle = Lifecycle::Ignored(Box::new(op.data.take())); - } else { - driver.ops.remove(op.index); - } - } - Lifecycle::Ignored(..) => unreachable!(), - } + self.inner.borrow_mut().remove_op(op) } } @@ -277,8 +116,11 @@ impl From for Handle { } } -impl From<&Handle> for WeakHandle { - fn from(handle: &Handle) -> Self { +impl From for WeakHandle +where + T: Deref, +{ + fn from(handle: T) -> Self { Self { inner: Rc::downgrade(&handle.inner), } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index 2e2bbc23..ab80624b 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -1,12 +1,13 @@ use crate::buf::fixed::FixedBuffers; -use crate::runtime::driver::op::Lifecycle; +use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable}; use io_uring::opcode::AsyncCancel; -use io_uring::IoUring; +use io_uring::{cqueue, squeue, IoUring}; use slab::Slab; use std::cell::RefCell; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::rc::Rc; +use std::task::{Context, Poll}; pub(crate) use handle::*; @@ -18,12 +19,12 @@ pub(crate) struct Driver { ops: Ops, /// IoUring bindings - pub(crate) uring: IoUring, + uring: IoUring, /// Reference to the currently registered buffers. /// Ensures that the buffers are not dropped until /// after the io-uring runtime has terminated. - pub(crate) fixed_buffers: Option>>, + fixed_buffers: Option>>, } struct Ops { @@ -56,7 +57,25 @@ impl Driver { self.ops.lifecycle.len() } - pub(crate) fn tick(&mut self) { + pub(crate) fn submit(&mut self) -> io::Result<()> { + loop { + match self.uring.submit() { + Ok(_) => { + self.uring.submission().sync(); + return Ok(()); + } + Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => { + self.dispatch_completions(); + } + Err(e) if e.raw_os_error() != Some(libc::EINTR) => { + return Err(e); + } + _ => continue, + } + } + } + + pub(crate) fn dispatch_completions(&mut self) { let mut cq = self.uring.completion(); cq.sync(); @@ -74,20 +93,194 @@ impl Driver { } } - pub(crate) fn submit(&mut self) -> io::Result<()> { - loop { - match self.uring.submit() { - Ok(_) => { - self.uring.submission().sync(); - return Ok(()); + pub(crate) fn register_buffers( + &mut self, + buffers: Rc>, + ) -> io::Result<()> { + self.uring + .submitter() + .register_buffers(buffers.borrow().iovecs())?; + + self.fixed_buffers = Some(buffers); + Ok(()) + } + + pub(crate) fn unregister_buffers( + &mut self, + buffers: Rc>, + ) -> io::Result<()> { + if let Some(currently_registered) = &self.fixed_buffers { + if Rc::ptr_eq(&buffers, currently_registered) { + self.uring.submitter().unregister_buffers()?; + self.fixed_buffers = None; + return Ok(()); + } + } + Err(io::Error::new( + io::ErrorKind::Other, + "fixed buffers are not currently registered", + )) + } + + pub(crate) fn submit_op( + &mut self, + mut data: T, + f: F, + handle: WeakHandle, + ) -> io::Result> + where + T: Completable, + F: FnOnce(&mut T) -> squeue::Entry, + { + let index = self.ops.insert(); + + // Configure the SQE + let sqe = f(&mut data).user_data(index as _); + + // Create the operation + let op = Op::new(handle, data, index); + + // Push the new operation + while unsafe { self.uring.submission().push(&sqe).is_err() } { + // If the submission queue is full, flush it to the kernel + self.submit()?; + } + + Ok(op) + } + + pub(crate) fn remove_op(&mut self, op: &mut Op) { + use std::mem; + + // Get the Op Lifecycle state from the driver + let (lifecycle, completions) = match self.ops.get_mut(op.index()) { + Some(val) => val, + None => { + // Op dropped after the driver + return; + } + }; + + match mem::replace(lifecycle, Lifecycle::Submitted) { + Lifecycle::Submitted | Lifecycle::Waiting(_) => { + *lifecycle = Lifecycle::Ignored(Box::new(op.take_data())); + } + Lifecycle::Completed(..) => { + self.ops.remove(op.index()); + } + Lifecycle::CompletionList(indices) => { + // Deallocate list entries, recording if more CQE's are expected + let more = { + let mut list = indices.into_list(completions); + cqueue::more(list.peek_end().unwrap().flags) + // Dropping list deallocates the list entries + }; + if more { + // If more are expected, we have to keep the op around + *lifecycle = Lifecycle::Ignored(Box::new(op.take_data())); + } else { + self.ops.remove(op.index()); } - Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => { - self.tick(); + } + Lifecycle::Ignored(..) => unreachable!(), + } + } + + pub(crate) fn poll_op(&mut self, op: &mut Op, cx: &mut Context<'_>) -> Poll + where + T: Unpin + 'static + Completable, + { + use std::mem; + + let (lifecycle, _) = self + .ops + .get_mut(op.index()) + .expect("invalid internal state"); + + match mem::replace(lifecycle, Lifecycle::Submitted) { + Lifecycle::Submitted => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) => { + *lifecycle = Lifecycle::Waiting(waker); + Poll::Pending + } + Lifecycle::Ignored(..) => unreachable!(), + Lifecycle::Completed(cqe) => { + self.ops.remove(op.index()); + Poll::Ready(op.take_data().unwrap().complete(cqe)) + } + Lifecycle::CompletionList(..) => { + unreachable!("No `more` flag set for SingleCQE") + } + } + } + + pub(crate) fn poll_multishot_op( + &mut self, + op: &mut Op, + cx: &mut Context<'_>, + ) -> Poll + where + T: Unpin + 'static + Completable + Updateable, + { + use std::mem; + + let (lifecycle, completions) = self + .ops + .get_mut(op.index()) + .expect("invalid internal state"); + + match mem::replace(lifecycle, Lifecycle::Submitted) { + Lifecycle::Submitted => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) => { + *lifecycle = Lifecycle::Waiting(waker); + Poll::Pending + } + Lifecycle::Ignored(..) => unreachable!(), + Lifecycle::Completed(cqe) => { + // This is possible. We may have previously polled a CompletionList, + // and the final CQE registered as Completed + self.ops.remove(op.index()); + Poll::Ready(op.take_data().unwrap().complete(cqe)) + } + Lifecycle::CompletionList(indices) => { + let mut data = op.take_data().unwrap(); + let mut status = Poll::Pending; + // Consume the CqeResult list, calling update on the Op on all Cqe's flagged `more` + // If the final Cqe is present, clean up and return Poll::Ready + for cqe in indices.into_list(completions) { + if cqueue::more(cqe.flags) { + data.update(cqe); + } else { + status = Poll::Ready(cqe); + break; + } } - Err(e) if e.raw_os_error() != Some(libc::EINTR) => { - return Err(e); + match status { + Poll::Pending => { + // We need more CQE's. Restore the op state + op.insert_data(data); + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Poll::Ready(cqe) => { + self.ops.remove(op.index()); + Poll::Ready(data.complete(cqe)) + } } - _ => continue, } } } @@ -179,7 +372,7 @@ impl Drop for Driver { // If waiting fails, ignore the error. The wait will be attempted // again on the next loop. let _ = self.wait(); - self.tick(); + self.dispatch_completions(); } Some(_) => { @@ -370,7 +563,7 @@ mod test { #[test] fn complete_after_drop() { let (op, data) = init(); - let index = op.index; + let index = op.index(); drop(op); assert_eq!(2, Rc::strong_count(&data)); @@ -383,7 +576,12 @@ mod test { }; CONTEXT.with(|cx| { - cx.with_handle_mut(|driver| driver.inner.borrow_mut().ops.complete(index, cqe)) + cx.handle() + .unwrap() + .inner + .borrow_mut() + .ops + .complete(index, cqe) }); assert_eq!(1, Rc::strong_count(&data)); @@ -399,33 +597,36 @@ mod test { let op = CONTEXT.with(|cx| { cx.set_handle(driver.into()); - cx.with_handle_mut(|driver| { - let index = driver.inner.borrow_mut().ops.insert(); + let driver = cx.handle().unwrap(); - Op::new((&*driver).into(), data.clone(), index) - }) + let index = driver.inner.borrow_mut().ops.insert(); + + Op::new((&driver).into(), data.clone(), index) }); (op, data) } fn num_operations() -> usize { - CONTEXT.with(|cx| cx.with_handle_mut(|driver| driver.inner.borrow().num_operations())) + CONTEXT.with(|cx| cx.handle().unwrap().inner.borrow().num_operations()) } fn complete(op: &Op>, result: io::Result) { let cqe = CqeResult { result, flags: 0 }; + CONTEXT.with(|cx| { - cx.with_handle_mut(|driver| driver.inner.borrow_mut().ops.complete(op.index, cqe)) + let driver = cx.handle().unwrap(); + + driver.inner.borrow_mut().ops.complete(op.index(), cqe); }); } fn release() { CONTEXT.with(|cx| { - cx.with_handle_mut(|driver| { - driver.inner.borrow_mut().ops.lifecycle.clear(); - driver.inner.borrow_mut().ops.completions.clear(); - }); + let driver = cx.handle().unwrap(); + + driver.inner.borrow_mut().ops.lifecycle.clear(); + driver.inner.borrow_mut().ops.completions.clear(); cx.unset_driver(); }); diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 096c43e9..5758a29d 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -24,10 +24,10 @@ pub(crate) type Completion = SlabListEntry; pub(crate) struct Op { driver: driver::WeakHandle, // Operation index in the slab - pub(crate) index: usize, + index: usize, // Per-operation data - pub(crate) data: Option, + data: Option, // CqeType marker _cqe_type: PhantomData, @@ -90,12 +90,9 @@ impl From for CqeResult { } } -impl Op -where - T: Completable, -{ +impl Op { /// Create a new operation - pub(crate) fn new(driver: driver::WeakHandle, data: T, index: usize) -> Self { + pub(super) fn new(driver: driver::WeakHandle, data: T, index: usize) -> Self { Op { driver, index, @@ -103,6 +100,18 @@ where _cqe_type: PhantomData, } } + + pub(super) fn index(&self) -> usize { + self.index + } + + pub(super) fn take_data(&mut self) -> Option { + self.data.take() + } + + pub(super) fn insert_data(&mut self, data: T) { + self.data = Some(data); + } } impl Future for Op diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index e1c7ac23..a757225f 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,7 +1,6 @@ use std::future::Future; use std::io; use std::mem::ManuallyDrop; -use std::os::unix::io::AsRawFd; use tokio::io::unix::AsyncFd; use tokio::task::LocalSet; @@ -16,14 +15,14 @@ thread_local! { /// The Runtime executor pub struct Runtime { + /// Tokio runtime, always current-thread + tokio_rt: ManuallyDrop, + /// LocalSet for !Send tasks local: ManuallyDrop, /// Strong reference to the driver. driver: driver::Handle, - - /// Tokio runtime, always current-thread - rt: ManuallyDrop, } /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. @@ -71,31 +70,17 @@ impl Runtime { .enable_all() .build()?; - let rt = ManuallyDrop::new(rt); - + let tokio_rt = ManuallyDrop::new(rt); let local = ManuallyDrop::new(LocalSet::new()); - let driver = driver::Handle::new(b)?; - let driver_fd = driver.as_raw_fd(); - - let drive = { - let _guard = rt.enter(); - let driver = AsyncFd::new(driver_fd).unwrap(); - - async move { - loop { - // Wait for read-readiness - let mut guard = driver.readable().await.unwrap(); - CONTEXT.with(|cx| cx.with_handle_mut(|driver| driver.tick())); - guard.clear_ready(); - } - } - }; - - local.spawn_local(drive); + start_uring_wakes_task(&tokio_rt, &local, driver.clone()); - Ok(Runtime { local, rt, driver }) + Ok(Runtime { + local, + tokio_rt, + driver, + }) } /// Runs a future to completion on the current runtime @@ -118,7 +103,7 @@ impl Runtime { tokio::pin!(future); let res = self - .rt + .tokio_rt .block_on(self.local.run_until(std::future::poll_fn(|cx| { // assert!(drive.as_mut().poll(cx).is_pending()); future.as_mut().poll(cx) @@ -130,14 +115,36 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { - // drop tasks + // drop tasks in correct order unsafe { ManuallyDrop::drop(&mut self.local); - ManuallyDrop::drop(&mut self.rt); + ManuallyDrop::drop(&mut self.tokio_rt); } } } +fn start_uring_wakes_task( + tokio_rt: &tokio::runtime::Runtime, + local: &LocalSet, + driver: driver::Handle, +) { + let _guard = tokio_rt.enter(); + let async_driver_handle = AsyncFd::new(driver).unwrap(); + + local.spawn_local(drive_uring_wakes(async_driver_handle)); +} + +async fn drive_uring_wakes(driver: AsyncFd) { + loop { + // Wait for read-readiness + let mut guard = driver.readable().await.unwrap(); + + guard.get_inner().dispatch_completions(); + + guard.clear_ready(); + } +} + #[cfg(test)] mod test {