Skip to content

Commit 52e6510

Browse files
authored
runtime: fix flaky test wake_while_rt_is_dropping (#5905)
1 parent e5e8855 commit 52e6510

File tree

1 file changed

+12
-33
lines changed

1 file changed

+12
-33
lines changed

tokio/tests/rt_common.rs

+12-33
Original file line numberDiff line numberDiff line change
@@ -950,10 +950,6 @@ rt_test! {
950950
#[test]
951951
fn wake_while_rt_is_dropping() {
952952
use tokio::sync::Barrier;
953-
use core::sync::atomic::{AtomicBool, Ordering};
954-
955-
let drop_triggered = Arc::new(AtomicBool::new(false));
956-
let set_drop_triggered = drop_triggered.clone();
957953

958954
struct OnDrop<F: FnMut()>(F);
959955

@@ -965,56 +961,39 @@ rt_test! {
965961

966962
let (tx1, rx1) = oneshot::channel();
967963
let (tx2, rx2) = oneshot::channel();
968-
let (tx3, rx3) = oneshot::channel();
969964

970-
let barrier = Arc::new(Barrier::new(4));
965+
let barrier = Arc::new(Barrier::new(3));
971966
let barrier1 = barrier.clone();
972967
let barrier2 = barrier.clone();
973-
let barrier3 = barrier.clone();
974968

975969
let rt = rt();
976970

977971
rt.spawn(async move {
972+
let mut tx2 = Some(tx2);
973+
let _d = OnDrop(move || {
974+
let _ = tx2.take().unwrap().send(());
975+
});
976+
978977
// Ensure a waker gets stored in oneshot 1.
979978
let _ = tokio::join!(rx1, barrier1.wait());
980-
tx3.send(()).unwrap();
981979
});
982980

983981
rt.spawn(async move {
984-
let h1 = tokio::runtime::Handle::current();
985-
// When this task is dropped, we'll be "closing remotes".
986-
// We spawn a new task that owns the `tx1`, to move its Drop
987-
// out of here.
988-
//
989-
// Importantly, the oneshot 1 has a waker already stored, so
990-
// the eventual drop here will try to re-schedule again.
991-
let mut opt_tx1 = Some(tx1);
982+
let mut tx1 = Some(tx1);
992983
let _d = OnDrop(move || {
993-
let tx1 = opt_tx1.take().unwrap();
994-
h1.spawn(async move {
995-
tx1.send(()).unwrap();
996-
});
997-
// Just a sanity check that this entire thing actually happened
998-
set_drop_triggered.store(true, Ordering::Relaxed);
984+
let _ = tx1.take().unwrap().send(());
999985
});
1000-
let _ = tokio::join!(rx2, barrier2.wait());
1001-
});
1002986

1003-
rt.spawn(async move {
1004-
let _ = tokio::join!(rx3, barrier3.wait());
1005-
// We'll never get here, but once task 3 drops, this will
1006-
// force task 2 to re-schedule since it's waiting on oneshot 2.
1007-
tx2.send(()).unwrap();
987+
// Ensure a waker gets stored in oneshot 2.
988+
let _ = tokio::join!(rx2, barrier2.wait());
1008989
});
1009990

1010991
// Wait until every oneshot channel has been polled.
1011992
rt.block_on(barrier.wait());
1012993

1013-
// Drop the rt
994+
// Drop the rt. Regardless of which task is dropped first, its destructor will wake the
995+
// other task.
1014996
drop(rt);
1015-
1016-
// Make sure that the spawn actually happened
1017-
assert!(drop_triggered.load(Ordering::Relaxed));
1018997
}
1019998

1020999
#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()

0 commit comments

Comments
 (0)