-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of notify_last method #6520
Changes from 4 commits
c3bbef6
0fd084f
f170624
96d3823
942539c
ff52e32
fbcb9a7
a945474
c387ef1
fd474cb
1dff2b9
f498f7d
428b8ec
1ad00d2
531ef54
c0ffca2
ab3143a
22e279f
fc029cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -223,7 +223,9 @@ struct Waiter { | |
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`. | ||
waker: UnsafeCell<Option<Waker>>, | ||
|
||
/// Notification for this waiter. | ||
/// Notification for this waiter. Uses 2 bits to store if and how was | ||
/// notified, 2 bits for storing if it was woken up using FIFO or LIFO, and | ||
/// the rest of it is unused. | ||
/// * if it's `None`, then `waker` is protected by the `waiters` lock. | ||
/// * if it's `Some`, then `waker` is exclusively owned by the | ||
/// enclosing `Waiter` and can be accessed without locking. | ||
|
@@ -261,6 +263,19 @@ const NOTIFICATION_ONE: usize = 1; | |
// Notification type used by `notify_waiters`. | ||
const NOTIFICATION_ALL: usize = 2; | ||
|
||
const NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT: usize = 2; | ||
const NOTIFICATION_TYPE_MASK: usize = (1 << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT) - 1; | ||
const NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK: usize = !NOTIFICATION_TYPE_MASK; | ||
|
||
// Unspecified wakeup order | ||
const NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE: usize = 0; | ||
|
||
// Fifo (default) wakeup order | ||
const NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO: usize = 1; | ||
|
||
// Lifo wakeup order | ||
const NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO: usize = 2; | ||
|
||
/// Notification for a `Waiter`. | ||
/// This struct is equivalent to `Option<Notification>`, but uses | ||
/// `AtomicUsize` inside for atomic operations. | ||
|
@@ -269,30 +284,48 @@ struct AtomicNotification(AtomicUsize); | |
|
||
impl AtomicNotification { | ||
fn none() -> Self { | ||
AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE)) | ||
AtomicNotification(AtomicUsize::new(0)) | ||
} | ||
|
||
/// Store-release a notification. | ||
/// This method should be called exactly once. | ||
fn store_release(&self, notification: Notification) { | ||
self.0.store(notification as usize, Release); | ||
fn store_release(&self, notification: Notification, strategy: Option<NotifyOneStrategy>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should no longer apply after the latest refactor. |
||
let data: usize = match strategy { | ||
None => notification as usize & NOTIFICATION_TYPE_MASK, | ||
Some(strategy) => { | ||
(((strategy as usize) << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT) | ||
& NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK) | ||
| (notification as usize & NOTIFICATION_TYPE_MASK) | ||
} | ||
}; | ||
self.0.store(data, Release); | ||
} | ||
|
||
fn load(&self, ordering: Ordering) -> Option<Notification> { | ||
match self.0.load(ordering) { | ||
fn load(&self, ordering: Ordering) -> (Option<Notification>, Option<NotifyOneStrategy>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there ever a time when this will return enum Notification {
One(Strategy),
All,
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, let me change this towards your suggestion. |
||
let data = self.0.load(ordering); | ||
let notification = match data & NOTIFICATION_TYPE_MASK { | ||
NOTIFICATION_NONE => None, | ||
NOTIFICATION_ONE => Some(Notification::One), | ||
NOTIFICATION_ALL => Some(Notification::All), | ||
_ => unreachable!(), | ||
} | ||
}; | ||
let strategy = match (data & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK) | ||
>> NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT | ||
{ | ||
NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE => None, | ||
NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO => Some(NotifyOneStrategy::Fifo), | ||
NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO => Some(NotifyOneStrategy::Lifo), | ||
_ => unreachable!(), | ||
}; | ||
(notification, strategy) | ||
} | ||
|
||
/// Clears the notification. | ||
/// This method is used by a `Notified` future to consume the | ||
/// notification. It uses relaxed ordering and should be only | ||
/// used once the atomic notification is no longer shared. | ||
fn clear(&self) { | ||
self.0.store(NOTIFICATION_NONE, Relaxed); | ||
self.0.store(0, Relaxed); | ||
} | ||
} | ||
|
||
|
@@ -303,6 +336,13 @@ enum Notification { | |
All = NOTIFICATION_ALL, | ||
} | ||
|
||
#[derive(Debug, PartialEq, Eq)] | ||
#[repr(usize)] | ||
enum NotifyOneStrategy { | ||
Fifo = NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO, | ||
Lifo = NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO, | ||
} | ||
|
||
/// List used in `Notify::notify_waiters`. It wraps a guarded linked list | ||
/// and gates the access to it on `notify.waiters` mutex. It also empties | ||
/// the list on drop. | ||
|
@@ -349,7 +389,7 @@ impl Drop for NotifyWaitersList<'_> { | |
while let Some(waiter) = self.list.pop_back() { | ||
// Safety: we never make mutable references to waiters. | ||
let waiter = unsafe { waiter.as_ref() }; | ||
waiter.notification.store_release(Notification::All); | ||
waiter.notification.store_release(Notification::All, None); | ||
} | ||
} | ||
} | ||
|
@@ -521,7 +561,7 @@ impl Notify { | |
} | ||
} | ||
|
||
/// Notifies a waiting task. | ||
/// Notifies the first waiting task. | ||
/// | ||
/// If a task is currently waiting, that task is notified. Otherwise, a | ||
/// permit is stored in this `Notify` value and the **next** call to | ||
|
@@ -558,6 +598,21 @@ impl Notify { | |
// Alias for old name in 0.x | ||
#[cfg_attr(docsrs, doc(alias = "notify"))] | ||
pub fn notify_one(&self) { | ||
self.notify_with_strategy(NotifyOneStrategy::Fifo); | ||
} | ||
|
||
/// Notifies the last waiting task. | ||
/// | ||
/// This function behaves identically to `notify_one` but using a | ||
/// LIFO algorithm to notify waiters from the queue, if there are any. | ||
pfreixes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// | ||
/// Check the `notify_one` documentation for more info and | ||
pfreixes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// examples. | ||
pub fn notify_one_lifo(&self) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not against calling this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self.notify_with_strategy(NotifyOneStrategy::Lifo); | ||
} | ||
|
||
fn notify_with_strategy(&self, strategy: NotifyOneStrategy) { | ||
// Load the current state | ||
let mut curr = self.state.load(SeqCst); | ||
|
||
|
@@ -585,7 +640,7 @@ impl Notify { | |
// transition out of WAITING while the lock is held. | ||
curr = self.state.load(SeqCst); | ||
|
||
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) { | ||
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) { | ||
drop(waiters); | ||
waker.wake(); | ||
} | ||
|
@@ -673,7 +728,7 @@ impl Notify { | |
} | ||
|
||
// This waiter is unlinked and will not be shared ever again, release it. | ||
waiter.notification.store_release(Notification::All); | ||
waiter.notification.store_release(Notification::All, None); | ||
} | ||
None => { | ||
break 'outer; | ||
|
@@ -708,7 +763,12 @@ impl Default for Notify { | |
impl UnwindSafe for Notify {} | ||
impl RefUnwindSafe for Notify {} | ||
|
||
fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> { | ||
fn notify_locked( | ||
waiters: &mut WaitList, | ||
state: &AtomicUsize, | ||
curr: usize, | ||
strategy: NotifyOneStrategy, | ||
) -> Option<Waker> { | ||
match get_state(curr) { | ||
EMPTY | NOTIFIED => { | ||
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); | ||
|
@@ -728,8 +788,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op | |
// concurrently change as holding the lock is required to | ||
// transition **out** of `WAITING`. | ||
// | ||
// Get a pending waiter | ||
let waiter = waiters.pop_back().unwrap(); | ||
// Get a pending waiter using one of the available dequeue strategies. | ||
let waiter = match strategy { | ||
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(), | ||
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(), | ||
}; | ||
|
||
// Safety: we never make mutable references to waiters. | ||
let waiter = unsafe { waiter.as_ref() }; | ||
|
@@ -738,7 +801,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op | |
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; | ||
|
||
// This waiter is unlinked and will not be shared ever again, release it. | ||
waiter.notification.store_release(Notification::One); | ||
waiter | ||
.notification | ||
.store_release(Notification::One, Some(strategy)); | ||
|
||
if waiters.is_empty() { | ||
// As this the **final** waiter in the list, the state | ||
|
@@ -998,7 +1063,8 @@ impl Notified<'_> { | |
ready!(crate::trace::trace_leaf(&mut ctx)); | ||
} | ||
|
||
if waiter.notification.load(Acquire).is_some() { | ||
let (notification, _) = waiter.notification.load(Acquire); | ||
if notification.is_some() { | ||
// Safety: waiter is already unlinked and will not be shared again, | ||
// so we have an exclusive access to `waker`. | ||
drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }); | ||
|
@@ -1018,7 +1084,8 @@ impl Notified<'_> { | |
// We hold the lock and notifications are set only with the lock held, | ||
// so this can be relaxed, because the happens-before relationship is | ||
// established through the mutex. | ||
if waiter.notification.load(Relaxed).is_some() { | ||
let (notification, _) = waiter.notification.load(Acquire); | ||
if notification.is_some() { | ||
// Safety: waiter is already unlinked and will not be shared again, | ||
// so we have an exclusive access to `waker`. | ||
old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; | ||
|
@@ -1120,7 +1187,7 @@ impl Drop for Notified<'_> { | |
|
||
// We hold the lock, so this field is not concurrently accessed by | ||
// `notify_*` functions and we can use the relaxed ordering. | ||
let notification = waiter.notification.load(Relaxed); | ||
let (notification, strategy) = waiter.notification.load(Relaxed); | ||
|
||
// remove the entry from the list (if not already removed) | ||
// | ||
|
@@ -1138,7 +1205,9 @@ impl Drop for Notified<'_> { | |
// the notification was triggered via `notify_one`, it must be sent | ||
// to the next waiter. | ||
if notification == Some(Notification::One) { | ||
if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { | ||
if let Some(waker) = | ||
notify_locked(&mut waiters, ¬ify.state, notify_state, strategy.unwrap()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can drop the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you prefer to just pass or return the
And use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I commented above, but it looks like the strategy is an argument to enum Notification {
One(Strategy),
All,
} You can implement the conversion of the enum to size using a match statement, the compiler will optimize it anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeps, changing a bit this one. |
||
{ | ||
drop(waiters); | ||
waker.wake(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,6 +21,38 @@ fn notify_notified_one() { | |||||
assert_ready!(notified.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notify_multi_notified_one_fifo() { | ||||||
let notify = Notify::new(); | ||||||
let mut notified1 = spawn(async { notify.notified().await }); | ||||||
let mut notified2 = spawn(async { notify.notified().await }); | ||||||
|
||||||
// add two waiters into the queue | ||||||
assert_pending!(notified1.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
|
||||||
// should wakeup the first one | ||||||
notify.notify_one(); | ||||||
assert_ready!(notified1.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notify_multi_notified_one_lifo() { | ||||||
let notify = Notify::new(); | ||||||
let mut notified1 = spawn(async { notify.notified().await }); | ||||||
let mut notified2 = spawn(async { notify.notified().await }); | ||||||
|
||||||
// add two waiters into the queue | ||||||
assert_pending!(notified1.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
|
||||||
// should wakeup the last one | ||||||
notify.notify_one_lifo(); | ||||||
assert_pending!(notified1.poll()); | ||||||
assert_ready!(notified2.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notified_one_notify() { | ||||||
let notify = Notify::new(); | ||||||
|
@@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() { | |||||
assert_ready!(notified2.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notified_multi_notify_drop_one_fifo() { | ||||||
let notify = Notify::new(); | ||||||
let mut notified1 = spawn(async { notify.notified().await }); | ||||||
let mut notified2 = spawn(async { notify.notified().await }); | ||||||
let mut notified3 = spawn(async { notify.notified().await }); | ||||||
|
||||||
// add waiters by order of poll execution | ||||||
assert_pending!(notified1.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
assert_pending!(notified3.poll()); | ||||||
|
||||||
// by default fifo | ||||||
notify.notify_one(); | ||||||
|
||||||
drop(notified1); | ||||||
|
||||||
// next waiter should be the one to be to woken up | ||||||
assert_ready!(notified2.poll()); | ||||||
assert_pending!(notified3.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notified_multi_notify_drop_one_lifo() { | ||||||
let notify = Notify::new(); | ||||||
let mut notified1 = spawn(async { notify.notified().await }); | ||||||
let mut notified2 = spawn(async { notify.notified().await }); | ||||||
let mut notified3 = spawn(async { notify.notified().await }); | ||||||
|
||||||
// add waiters by order of poll execution | ||||||
assert_pending!(notified1.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
assert_pending!(notified3.poll()); | ||||||
|
||||||
notify.notify_one_lifo(); | ||||||
|
||||||
drop(notified1); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't you drop
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, fixed! |
||||||
|
||||||
// latest waiter added should be the one to woken up | ||||||
assert_ready!(notified3.poll()); | ||||||
assert_pending!(notified2.poll()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn notify_in_drop_after_wake() { | ||||||
use futures::task::ArcWake; | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this isn't a const anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there was since bot attributes, if there were a notification and the strategy, but once Ive changed the interfaces we can go back to what was originally, so this should be "fixed" now.