Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit test showing some unexpected behaviour of mpsc::channel #2020

Conversation

bikeshedder
Copy link
Contributor

@bikeshedder bikeshedder commented Dec 22, 2019

This is the fixed version of PR #1997

@carllerche
Copy link
Member

Here is a loom test that catches the problem:

#[test]
fn try_recv() {
    loom::model(|| {
        use crate::sync::{mpsc, Semaphore};
        use loom::sync::{Arc, Mutex};

        const PERMITS: usize = 2;
        const TASKS: usize = 2;
        const CYCLES: usize = 1;

        struct Context {
            sem: Arc<Semaphore>,
            tx: mpsc::Sender<()>,
            rx: Mutex<mpsc::Receiver<()>>,
        }

        fn run(ctx: &Context) {
            block_on(async {
                let permit = ctx.sem.acquire().await;
                println!(" + before try_recv");
                assert_ok!(ctx.rx.lock().unwrap().try_recv());

                crate::task::yield_now().await;

                assert_ok!(ctx.tx.clone().try_send(()));
                println!(" + after try_send");
                drop(permit);
            });
        }

        let (tx, rx) = mpsc::channel(PERMITS);
        let sem = Arc::new(Semaphore::new(PERMITS));
        let ctx = Arc::new(Context { sem, tx, rx: Mutex::new(rx) });

        for _ in 0..PERMITS {
            assert_ok!(ctx.tx.clone().try_send(()));
        }

        let mut ths = Vec::new();

        for _ in 0..TASKS {
            let ctx = ctx.clone();

            ths.push(thread::spawn(move || {
                run(&ctx);
            }));
        }

        run(&ctx);

        for th in ths {
            th.join().unwrap()
        }
    });
}

@Darksonn
Copy link
Contributor

What is the status of this PR?

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jul 25, 2020
@tmiasko
Copy link
Contributor

tmiasko commented Jul 25, 2020

The operations on semaphore are not linearizable.

For example, if in the call to Sempahore::add_permits the rx_lock is
already locked, the call will return immediately, and the responsibility for
adding the permits is delegated to the thread holding the lock. At this point
the add_permits has yet to actually have an effect. Looks like this in part
responsible for this surprising behaviour.

@bikeshedder
Copy link
Contributor Author

As far as I understand it it's not the semaphore causing the problems. e.g. when replacing the tokio channel by crossbeam the problem no longer exists.

I think the problem exists because try_send(...) has returned but try_recv(...) can not read from the channel, yet. Therefore releasing the permit causes another task to call try_recv(...) prematurely.

@tmiasko
Copy link
Contributor

tmiasko commented Jul 26, 2020

I am referring to the semaphore that is used to enforce the channel capacity.

When thread returns a permit, after successful try_recv, and soon after
attempts to obtain a new one in try_send, act of returning the permit
might not have an effect yet, and it might fail to obtain one.

EDIT: I completely missed the fact that this is single consumer queue, so non-linearizability of add_permits is non-issue. Sorry for the noise.

@Darksonn
Copy link
Contributor

Sorry for the noise.

Does this mean the issue should be closed?

@tmiasko
Copy link
Contributor

tmiasko commented Jul 28, 2020

@Darksonn The example demonstrates that try_recv might return an empty error
even though the number of successfully completed try_send operations exceeds
the number of successfully completed try_recv operations (i.e., queue is not
empty).

I would generally say that guarantees provided by the queue merit mention in
the documentation (whatever they are), and so does the fact that try_recv
might fail if queue is not empty (I suspect that happens if the delivery of the
next message in the order is still pending).

Note that I didn't open this PR.

@bikeshedder
Copy link
Contributor Author

That issue still exists: One worker calls try_send and the other calls try_recv afterwards and receives None even though there should be something in the queue. This makes it impossible to put a channel receiver behind a semaphore to build a queue. I ran into this problem when writing deadpool. I ended up switching from tokio channels to crossbeam: deadpool-rs/deadpool@e44f9a9

@cynecx
Copy link
Contributor

cynecx commented Jul 28, 2020

@bikeshedder https://github.com/stjepang/async-channel might be a more approriate replacement because crossbeam's channels aren't compatible with the async-ecosystem.

@bikeshedder
Copy link
Contributor Author

@cynecx crossbeam works great and is very compatible with the async-ecosystem since it provides lockfree channel implementations. You just can't await a value but need to add your own synchronization primitives around it. Have you looked at the current implementation of the managed pool in deadpool? Since all reads to the channel add a permit to the semaphore and all writes to the channel remove a permit first it works like a charm. For the fun of it I also created deadqueue which brings crossbeam to the async-ecosystem with very little extra code needed. If you want to know more about why did all that feel free to contact me. I'm known as @bikeshedder on both Rust Servers and also the Tokio one.

This issue is about tokio and the current channel implementation is currently broken when using try_send + try_recv from different tasks synchronized by some synchronization primitive. I still use tokio channels in my own projects but only when using Receiver::recv(). Receiver::try_recv() is unreliable in its current form and should be avoided until this bug is fixed.

@cynecx
Copy link
Contributor

cynecx commented Jul 28, 2020

@bikeshedder Oh sorry, I thought we were talking about crossbeam's channels :). That sounds reasonable.

@carllerche carllerche added this to the v1.0 milestone Dec 3, 2020
@carllerche
Copy link
Member

We probably should remove these APIs in 1.0 until we can fix them.

@LucioFranco
Copy link
Member

Sounds like we should just add this as a note to the docs? What do others think?

@Darksonn
Copy link
Contributor

I mean, how hard is this to fix?

@carllerche
Copy link
Member

@Darksonn medium hard. More than quick patch.

carllerche added a commit that referenced this pull request Dec 12, 2020
The mpsc `try_recv()` functions have an issue where a sent message
happens-before a call to `try_recv()` but `try_recv()` returns `None`.
Fixing this is non-trivial, so the function is removed for 1.0. When the
bug is fixed, the function can be added back.

Closes #2020
carllerche added a commit that referenced this pull request Dec 14, 2020
The mpsc `try_recv()` functions have an issue where a sent message
happens-before a call to `try_recv()` but `try_recv()` returns `None`.
Fixing this is non-trivial, so the function is removed for 1.0. When the
bug is fixed, the function can be added back.

Closes #2020
@sunjay
Copy link
Contributor

sunjay commented Dec 26, 2020

Is there any issue for adding try_recv back into mpsc::UnboundedReceiver? I am blocked on this for updating to tokio v1.0 from v0.3.

@sunjay sunjay mentioned this pull request Dec 26, 2020
5 tasks
@Darksonn
Copy link
Contributor

@sunjay For now you can do .recv().now_or_never() using FutureExt::now_or_never.

I have opened #3350 to track adding it back.

marcbowes added a commit to awslabs/amazon-qldb-driver-rust that referenced this pull request Jan 14, 2021
This commit runs `cargo upgrade` which introduces 3 breaking changes:

First, the `gen_range` function from the `rand` crate now takes a range
as a single argument rather than as two params. Good change!

Next, tokio is now at 1.0! We make changes to 1/ the runtime
builder (for the blocking driver), 2/ where we don't need mutable refs
anymore, 3/ move away from try_recv as described in
tokio-rs/tokio#2020 and 4/ replace `delay_for`
with `sleep`.

The main reason for this upgrade, beyond hygiene, is to pick up a new
version of rusoto which we will use shortly.
alpian pushed a commit to awslabs/amazon-qldb-driver-rust that referenced this pull request Jan 15, 2021
This commit runs `cargo upgrade` which introduces 3 breaking changes:

First, the `gen_range` function from the `rand` crate now takes a range
as a single argument rather than as two params. Good change!

Next, tokio is now at 1.0! We make changes to 1/ the runtime
builder (for the blocking driver), 2/ where we don't need mutable refs
anymore, 3/ move away from try_recv as described in
tokio-rs/tokio#2020 and 4/ replace `delay_for`
with `sleep`.

The main reason for this upgrade, beyond hygiene, is to pick up a new
version of rusoto which we will use shortly.
marcbowes added a commit to awslabs/amazon-qldb-driver-rust that referenced this pull request Jan 15, 2021
This commit runs `cargo upgrade` which introduces 3 breaking changes:

First, the `gen_range` function from the `rand` crate now takes a range
as a single argument rather than as two params. Good change!

Next, tokio is now at 1.0! We make changes to 1/ the runtime
builder (for the blocking driver), 2/ where we don't need mutable refs
anymore, 3/ move away from try_recv as described in
tokio-rs/tokio#2020 and 4/ replace `delay_for`
with `sleep`.

The main reason for this upgrade, beyond hygiene, is to pick up a new
version of rusoto which we will use shortly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants