diff --git a/src/fs/file.rs b/src/fs/file.rs index c25ce30b..d1e55d9a 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -824,14 +824,17 @@ impl File { Op::statx(&self.fd)?.await } - /// Closes the file. + /// Closes the file using the uring asynchronous close operation and returns the possible error + /// as described in the close(2) man page. /// - /// The method completes once the close operation has completed, - /// guaranteeing that resources associated with the file have been released. + /// The programmer has the choice of calling this asynchronous close and waiting for the result + /// or letting the library close the file automatically and simply letting the file go out of + /// scope and having the library close the file descriptor automatically and synchronously. /// - /// If `close` is not called before dropping the file, the file is closed in - /// the background, but there is no guarantee as to **when** the close - /// operation will complete. + /// Calling this asynchronous close is to be preferred because it returns the close result + /// which as the man page points out, should not be ignored. This asynchronous close also + /// avoids the synchronous close system call and may result in better throughput as the thread + /// is not blocked during the close. /// /// # Examples /// @@ -849,9 +852,8 @@ impl File { /// }) /// } /// ``` - pub async fn close(self) -> io::Result<()> { - self.fd.close().await; - Ok(()) + pub async fn close(mut self) -> io::Result<()> { + self.fd.close().await } } diff --git a/src/future.rs b/src/future.rs index 1f48623b..54aa94f1 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,3 +1,5 @@ +// TODO see about removing or just commenting out. +#[allow(unused_macros)] macro_rules! ready { ($e:expr $(,)?) => { match $e { diff --git a/src/io/mod.rs b/src/io/mod.rs index fe30e99f..7fe2233d 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,7 +1,6 @@ mod accept; mod close; -pub(crate) use close::Close; mod connect; diff --git a/src/io/shared_fd.rs b/src/io/shared_fd.rs index d573265f..e3423238 100644 --- a/src/io/shared_fd.rs +++ b/src/io/shared_fd.rs @@ -1,19 +1,23 @@ -use crate::io::Close; use std::future::poll_fn; -use std::cell::RefCell; -use std::os::unix::io::{FromRawFd, RawFd}; -use std::rc::Rc; -use std::task::Waker; +use std::{ + cell::RefCell, + io, + os::unix::io::{FromRawFd, RawFd}, + rc::Rc, + task::Waker, +}; use crate::runtime::driver::op::Op; -use crate::runtime::CONTEXT; // Tracks in-flight operations on a file descriptor. Ensures all in-flight // operations complete before submitting the close. // -// If the runtime is unavailable, will fall back to synchronous Close to ensure -// File resources are not leaked. +// When closing the file descriptor because it is going out of scope, a synchronous close is +// employed. +// +// The closed state is tracked so close calls after the first are ignored. +// Only the first close call returns the true result of closing the file descriptor. #[derive(Clone)] pub(crate) struct SharedFd { inner: Rc, @@ -23,7 +27,8 @@ struct Inner { // Open file descriptor fd: RawFd, - // Waker to notify when the close operation completes. + // Track the sharing state of the file descriptor: + // normal, being waited on to allow a close by the parent's owner, or already closed. state: RefCell, } @@ -31,13 +36,10 @@ enum State { /// Initial state Init, - /// Waiting for all in-flight operation to complete. - Waiting(Option), - - /// The FD is closing - Closing(Op), + /// Waiting for the number of strong Rc pointers to drop to 1. + WaitingForUniqueness(Waker), - /// The FD is fully closed + /// The close has been triggered by the parent owner. Closed, } @@ -60,84 +62,43 @@ impl SharedFd { /// This prevents bugs where in-flight reads could operate on the incorrect /// file descriptor. /// - /// TO model this, if there are no in-flight operations, then - pub(crate) async fn close(mut self) { - // Get a mutable reference to Inner, indicating there are no - // in-flight operations on the FD. - if let Some(inner) = Rc::get_mut(&mut self.inner) { - // Submit the close operation - inner.submit_close_op(); - } - - self.inner.closed().await; - } -} - -impl Inner { - /// If there are no in-flight operations, submit the operation. - fn submit_close_op(&mut self) { - // Close the FD - let state = RefCell::get_mut(&mut self.state); - - // Submit a close operation - // If either: - // - runtime has already closed, or - // - submitting the Close operation fails - // we fall back on a synchronous `close`. This is safe as, at this point, - // we guarantee all in-flight operations have completed. The most - // common cause for an error is attempting to close the FD while - // off runtime. - // - // This is done by initializing a `File` with the FD and - // dropping it. - // - // TODO: Should we warn? - *state = match CONTEXT.try_with(|cx| cx.is_set()) { - Ok(true) => match Op::close(self.fd) { - Ok(op) => State::Closing(op), - Err(_) => { - let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; - State::Closed - } - }, - _ => { - let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; - State::Closed + pub(crate) async fn close(&mut self) -> io::Result<()> { + loop { + // Get a mutable reference to Inner, indicating there are no + // in-flight operations on the FD. + if let Some(inner) = Rc::get_mut(&mut self.inner) { + // Wait for the close operation. + return inner.async_close_op().await; } - }; + + self.sharedfd_is_unique().await; + } } - /// Completes when the FD has been closed. - async fn closed(&self) { - use std::future::Future; - use std::pin::Pin; + /// Completes when the SharedFd's Inner Rc strong count is 1. + /// Gets polled any time a SharedFd is dropped. + async fn sharedfd_is_unique(&self) { use std::task::Poll; poll_fn(|cx| { - let mut state = self.state.borrow_mut(); + if Rc::::strong_count(&self.inner) == 1 { + return Poll::Ready(()); + } + + let mut state = self.inner.state.borrow_mut(); match &mut *state { State::Init => { - *state = State::Waiting(Some(cx.waker().clone())); + *state = State::WaitingForUniqueness(cx.waker().clone()); Poll::Pending } - State::Waiting(Some(waker)) => { + State::WaitingForUniqueness(waker) => { if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } Poll::Pending } - State::Waiting(None) => { - *state = State::Waiting(Some(cx.waker().clone())); - Poll::Pending - } - State::Closing(op) => { - // Nothing to do if the close opeation failed. - let _ = ready!(Pin::new(op).poll(cx)); - *state = State::Closed; - Poll::Ready(()) - } State::Closed => Poll::Ready(()), } }) @@ -145,14 +106,55 @@ impl Inner { } } -impl Drop for Inner { +impl Inner { + async fn async_close_op(&mut self) -> io::Result<()> { + // &mut self implies there are no outstanding operations. + // If state already closed, the user closed multiple times; simply return Ok. + // Otherwise, set state to closed and then submit and await the uring close operation. + { + // Release state guard before await. + let state = RefCell::get_mut(&mut self.state); + + if let State::Closed = *state { + return Ok(()); + } + + *state = State::Closed; + } + Op::close(self.fd)?.await + } +} + +impl Drop for SharedFd { fn drop(&mut self) { - // Submit the close operation, if needed - match RefCell::get_mut(&mut self.state) { - State::Init | State::Waiting(..) => { - self.submit_close_op(); + // If the SharedFd state is Waiting + // The job of the SharedFd's drop is to possibly wake a task that is waiting for the + // reference count to go down. + use std::mem; + + let mut state = self.inner.state.borrow_mut(); + if let State::WaitingForUniqueness(_) = *state { + let state = &mut *state; + if let State::WaitingForUniqueness(waker) = mem::replace(state, State::Init) { + // Wake the task wanting to close this SharedFd and let it try again. If it finds + // there are no more outstanding clones, it will succeed. Otherwise it will start a new + // Future, waiting for another SharedFd to be dropped. + waker.wake() } - _ => {} } } } + +impl Drop for Inner { + fn drop(&mut self) { + // If the inner state isn't `Closed`, the user hasn't called close().await + // so do it synchronously. + + let state = self.state.borrow_mut(); + + if let State::Closed = *state { + return; + } + let _ = unsafe { std::fs::File::from_raw_fd(self.fd) }; + } +} diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 4014d38d..c194c510 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -33,6 +33,7 @@ impl RuntimeContext { } /// Check if driver is initialized + #[allow(dead_code)] pub(crate) fn is_set(&self) -> bool { self.driver .try_borrow() diff --git a/tests/fs_file.rs b/tests/fs_file.rs index d9cb7f7e..739fea56 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -342,6 +342,6 @@ fn assert_invalid_fd(fd: RawFd) { match f.read_to_end(&mut buf) { Err(ref e) if e.raw_os_error() == Some(libc::EBADF) => {} - res => panic!("{:?}", res), + res => panic!("assert_invalid_fd finds for fd {:?}, res = {:?}", fd, res), } }