Skip to content

Commit

Permalink
Ch. 17.02 (NoStarch edits): second section
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskrycho committed Jan 14, 2025
1 parent d1ede20 commit cd78320
Showing 1 changed file with 79 additions and 67 deletions.
146 changes: 79 additions & 67 deletions src/ch17-02-concurrency-with-async.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,18 @@ Try some of these variations on awaiting the futures and see what they do:
For an extra challenge, see if you can figure out what the output will be in
each case _before_ running the code!

### Message Passing
<!-- Old headings. Do not remove or links may break. -->
<a id="message-passing"></a>

### Counting Up on Two Tasks Using Message Passing

Sharing data between futures will also be familiar: we’ll use message passing
again, but this with async versions of the types and functions. We’ll take a
slightly different path than we did in Chapter 16, to illustrate some of the key
differences between thread-based and futures-based concurrency. In Listing 17-9,
we’ll begin with just a single async block—_not_ spawning a separate task as
we spawned a separate thread.
again, but this time with async versions of the types and functions. We’ll take
a slightly different path than we did in [Using Message Passing to Transfer Data
Between Threads][message-passing-threads]<!-- ignore --> to illustrate some of
the key differences between thread-based and futures-based concurrency. In
Listing 17-9, we’ll begin with just a single async block—_not_ spawning a
separate task as we spawned a separate thread.

<Listing number="17-9" caption="Creating an async channel and assigning the two halves to `tx` and `rx`" file-name="src/main.rs">

Expand Down Expand Up @@ -214,13 +218,13 @@ because the channel we’re sending it into is unbounded.
> code, and thus where to transition between sync and async code. In most async
> runtimes, `run` is actually named `block_on` for exactly this reason.
Notice two things about this example: First, the message will arrive right away!
Notice two things about this example. First, the message will arrive right away.
Second, although we use a future here, there’s no concurrency yet. Everything
in the listing happens in sequence, just as it would if there were no futures
involved.

Let’s address the first part by sending a series of messages, and sleep in
between them, as shown in Listing 17-10:
Let’s address the first part by sending a series of messages and sleeping in
between them, as shown in Listing 17-10.

<!-- We cannot test this one because it never stops! -->

Expand All @@ -232,26 +236,26 @@ between them, as shown in Listing 17-10:

</Listing>

In addition to sending the messages, we need to receive them. In this case, we
could do that manually, by just doing `rx.recv().await` four times, because we
know how many messages are coming in. In the real world, though, we’ll
generally be waiting on some _unknown_ number of messages. In that case, we need
to keep waiting until we determine that there are no more messages.
In addition to sending the messages, we need to receive them. In this case,
because we know how many messages are coming in, we could do that manually by
calling `rx.recv().await` four times. In the real world, though, we’ll generally
be waiting on some _unknown_ number of messages, so we need to keep waiting
until we determine that there are no more messages.

In Listing 16-10, we used a `for` loop to process all the items received from a
synchronous channel. However, Rust doesn’t yet have a way to write a `for` loop
over an _asynchronous_ series of items. Instead, we need to use a new kind of
loop we haven’t seen before, the `while let` conditional loop. A `while let`
loop is the loop version of the `if let` construct we saw back in Chapter 6. The
loop will continue executing as long as the pattern it specifies continues to
match the value.
synchronous channel. Rust doesn’t yet have a way to write a `for` loop over an
_asynchronous_ series of items, however, so we need to use a loop we haven’t
seen before: the `while let` conditional loop. This is the loop version of the
`if let` construct we saw back in the section [Concise Control Flow with `if
let` and `let else`][if-let]<!-- ignore -->. The loop will continue executing as
long as the pattern it specifies continues to match the value.

The `rx.recv` call produces a `Future`, which we await. The runtime will pause
the `Future` until it is ready. Once a message arrives, the future will resolve
to `Some(message)`, as many times as a message arrives. When the channel closes,
regardless of whether _any_ messages have arrived, the future will instead
resolve to `None` to indicate that there are no more values, and we should stop
polling—that is, stop awaiting.
the `Future` until it is ready. Once a message arrives, the `Future` will
resolve to `Some(message)` as many times as a message arrives. When the channel
closes, regardless of whether _any_ messages have arrived, the `Future` will
instead resolve to `None` to indicate that there are no more values and thus we
should stop polling—that is, stop awaiting.

The `while let` loop pulls all of this together. If the result of calling
`rx.recv().await` is `Some(message)`, we get access to the message and we can
Expand All @@ -260,30 +264,30 @@ use it in the loop body, just as we could with `if let`. If the result is
again, so the runtime pauses it again until another message arrives.

The code now successfully sends and receives all of the messages. Unfortunately,
there are still a couple problems. For one thing, the messages do not arrive at
half-second intervals. They arrive all at once, two seconds (2,000 milliseconds)
after we start the program. For another, this program also never exits! Instead,
it waits forever for new messages. You will need to shut it down using <span
there are still a couple of problems. For one thing, the messages do not arrive
at half-second intervals. They arrive all at once, 2 (2,000 milliseconds) after
we start the program. For another, this program also never exits! Instead, it
waits forever for new messages. You will need to shut it down using <span
class="keystroke">ctrl-c</span>.

Let’s start by understanding why the messages all come in at once after the full
delay, rather than coming in with delays in between each one. Within a given
async block, the order that `await` keywords appear in the code is also the
order they happen when running the program.
Let’s start by examining why the messages come in all at once after the full
delay, rather than coming in with delays between each one. Within a given async
block, the order in which `await` keywords appear in the code is also the order
in which they’re executed when the program runs.

There’s only one async block in Listing 17-10, so everything in it runs
linearly. There’s still no concurrency. All the `tx.send` calls happen,
interspersed with all of the `trpl::sleep` calls and their associated await
points. Only then does the `while let` loop get to go through any of the `await`
points on the `recv` calls.

To get the behavior we want, where the sleep delay happens between receiving
each message, we need to put the `tx` and `rx` operations in their own async
blocks. Then the runtime can execute each of them separately using `trpl::join`,
just as in the counting example. Once again, we await the result of calling
`trpl::join`, not the individual futures. If we awaited the individual futures
in sequence, we would just end up back in a sequential flow—exactly what we’re
trying _not_ to do.
To get the behavior we want, where the sleep delay happens between each message,
we need to put the `tx` and `rx` operations in their own async blocks, as shown
in Listing 17-11. Then the runtime can execute each of them separately using
`trpl::join`, just as in the counting example. Once again, we await the result
of calling `trpl::join`, not the individual futures. If we awaited the
individual futures in sequence, we would just end up back in a sequential
flow—exactly what we’re trying _not_ to do.

<!-- We cannot test this one because it never stops! -->

Expand All @@ -296,44 +300,46 @@ trying _not_ to do.
</Listing>

With the updated code in Listing 17-11, the messages get printed at
500-millisecond intervals, rather than all in a rush after two seconds.
500-millisecond intervals, rather than all in a rush after 2 seconds.

The program still never exits, though, because of the way `while let` loop
interacts with `trpl::join`:

- The future returned from `trpl::join` only completes once _both_ futures
- The future returned from `trpl::join` completes only once _both_ futures
passed to it have completed.
- The `tx` future completes once it finishes sleeping after sending the last
message in `vals`.
- The `rx` future won’t complete until the `while let` loop ends.
- The `while let` loop won’t end until awaiting `rx.recv` produces `None`.
- Awaiting `rx.recv` will only return `None` once the other end of the channel
- Awaiting `rx.recv` will return `None` only once the other end of the channel
is closed.
- The channel will only close if we call `rx.close` or when the sender side,
- The channel will close only if we call `rx.close` or when the sender side,
`tx`, is dropped.
- We don’t call `rx.close` anywhere, and `tx` won’t be dropped until the
outermost async block passed to `trpl::run` ends.
- The block can’t end because it is blocked on `trpl::join` completing, which
takes us back to the top of this list!
takes us back to the top of this list.

We could manually close `rx` by calling `rx.close` somewhere, but that doesn’t
make much sense. Stopping after handling some arbitrary number of messages would
make the program shut down, but we could miss messages. We need some other way
to make sure that `tx` gets dropped _before_ the end of the function.

Right now, the async block where we send the messages only borrows `tx` because
sending a message doesn’t require ownership, but if we could move `tx` into
that async block, it would be dropped once that block ends. In Chapter 13, we
learned how to use the `move` keyword with closures, and in Chapter 16, we saw
that we often need to move data into closures when working with threads. The
sending a message doesn’t require ownership, but if we could move `tx` into that
async block, it would be dropped once that block ends. In the Chapter 13 section
[Capturing References or Moving Ownership][capture-or-move]<!-- ignore -->, you
learned how to use the `move` keyword with closures, and, as discussed in the
Chapter 16 section [Using `move` Closures with Threads][move-threads]<!-- ignore
-->, we often need to move data into closures when working with threads. The
same basic dynamics apply to async blocks, so the `move` keyword works with
async blocks just as it does with closures.

In Listing 17-12, we change the async block for sending messages from a plain
`async` block to an `async move` block. When we run _this_ version of the code,
it shuts down gracefully after the last message is sent and received.
In Listing 17-12, we change the block used to send messages from `async` to
`async move`. When we run _this_ version of the code, it shuts down gracefully
after the last message is sent and received.

<Listing number="17-12" caption="A working example of sending and receiving messages between futures which correctly shuts down when complete" file-name="src/main.rs">
<Listing number="17-12" caption="A revision of the code from Listing 17-11 that correctly shuts down when complete" file-name="src/main.rs">

```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-12/src/main.rs:with-move}}
Expand All @@ -342,18 +348,8 @@ it shuts down gracefully after the last message is sent and received.
</Listing>

This async channel is also a multiple-producer channel, so we can call `clone`
on `tx` if we want to send messages from multiple futures. In Listing 17-13, we
clone `tx`, creating `tx1` outside the first async block. We move `tx1` into
that block just as we did before with `tx`. Then, later, we move the original
`tx` into a _new_ async block, where we send more messages on a slightly slower
delay. We happen to put this new async block after the async block for receiving
messages, but it could go before it just as well. The key is the order of the
futures are awaited in, not the order they are created in.

Both of the async blocks for sending messages need to be `async move` blocks, so
that both `tx` and `tx1` get dropped when those blocks finish. Otherwise we’ll
end up back in the same infinite loop we started out in. Finally, we switch from
`trpl::join` to `trpl::join3` to handle the additional future.
on `tx` if we want to send messages from multiple futures, as shown in Listing
17-13.

<Listing number="17-13" caption="Using multiple producers with async blocks" file-name="src/main.rs">

Expand All @@ -363,9 +359,21 @@ end up back in the same infinite loop we started out in. Finally, we switch from

</Listing>

Now we see all the messages from both sending futures. Because the sending
First, we clone `tx`, creating `tx1` outside the first async block. We move
`tx1` into that block just as we did before with `tx`. Then, later, we move the
original `tx` into a _new_ async block, where we send more messages on a
slightly slower delay. We happen to put this new async block after the async
block for receiving messages, but it could go before it just as well. The key is
the order in which the futures are awaited, not in which they’re created.

Both of the async blocks for sending messages need to be `async move` blocks, so
that both `tx` and `tx1` get dropped when those blocks finish. Otherwise, we’ll
end up back in the same infinite loop we started out in. Finally, we switch from
`trpl::join` to `trpl::join3` to handle the additional future.

Now we see all the messages from both sending futures, and because the sending
futures use slightly different delays after sending, the messages are also
received at those different intervals.
received at those different intervals:

<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
Expand All @@ -387,3 +395,7 @@ This is a good start, but it limits us to just a handful of futures: two with

[thread-spawn]: ch16-01-threads.html#creating-a-new-thread-with-spawn
[join-handles]: ch16-01-threads.html#waiting-for-all-threads-to-finish-using-join-handles
[message-passing-threads]: ch16-02-message-passing.html
[if-let]: ch06-03-if-let.html
[capture-or-move]: ch13-01-closures.html#capturing-references-or-moving-ownership
[move-threads]: ch16-01-threads.html#using-move-closures-with-threads

0 comments on commit cd78320

Please sign in to comment.