Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Add async mpsc::Sender::shared_send method #2041

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,88 @@ impl<T> Sender<T> {
Err(TrySendError::Closed(value)) => Err(SendError(value)),
}
}

/// Send a value through a shared reference, waiting until there is
/// capacity.
///
/// Behaves exactly like `send`, except that it performs some extra work to
/// synchronize through a shared reference. This is accomplished by creating
/// a new permit for every value sent over the channel. If possible, you
/// should prefer to use `send` since it has lower overhead.
///
/// See [`send`] for more documentation.
///
/// [`send`]: Sender::send
///
/// # Examples
///
/// In the following example, each call to `shared_send` will block until the
/// previously sent value was received.
///
/// ```rust
/// use tokio::sync::mpsc;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// {
/// let tx = Arc::new(tx);
///
/// for i in 0..10i32 {
/// let tx = Arc::clone(&tx);
///
/// let _ = tokio::spawn(async move {
/// if let Err(_) = tx.shared_send(i).await {
/// println!("receiver dropped");
/// }
/// });
/// }
/// }
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// }
/// }
/// ```
pub async fn shared_send(&self, value: T) -> Result<(), SendError<T>> {
use self::chan::Semaphore as _;
use crate::future::poll_fn;

let mut permit = Semaphore::new_permit();
let permit = Guard(&self.chan.inner.semaphore, &mut permit);

if poll_fn(|cx| self.chan.poll_ready_with_permit(cx, permit.1))
.await
.is_err()
{
return Err(SendError(value));
}

return match self
.chan
.try_send_with_permit(value, permit.1)
.map_err(TrySendError::<T>::from)
{
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => unreachable!(),
Err(TrySendError::Closed(value)) => Err(SendError(value)),
};

// A permit guard, making sure that the drop implementation is run as
// appropriate.
struct Guard<'a, S>(&'a S, &'a mut S::Permit)
where
S: chan::Semaphore;

impl<S> Drop for Guard<'_, S>
where
S: chan::Semaphore,
{
fn drop(&mut self) {
self.0.drop_permit(self.1);
}
}
}
}
24 changes: 20 additions & 4 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::task::{Context, Poll};

/// Channel sender
pub(crate) struct Tx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
pub(crate) inner: Arc<Chan<T, S>>,
permit: S::Permit,
}

Expand Down Expand Up @@ -91,19 +91,19 @@ pub(crate) trait Semaphore {

/// A value was sent into the channel and the permit held by `tx` is
/// dropped. In this case, the permit should not immeditely be returned to
/// the semaphore. Instead, the permit is returnred to the semaphore once
/// the semaphore. Instead, the permit is returned to the semaphore once
/// the sent value is read by the rx handle.
fn forget(&self, permit: &mut Self::Permit);

fn close(&self);
}

struct Chan<T, S> {
pub(crate) struct Chan<T, S> {
/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

/// Coordinates access to channel's capacity.
semaphore: S,
pub(crate) semaphore: S,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: AtomicWaker,
Expand Down Expand Up @@ -190,10 +190,26 @@ where
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}

pub(crate) fn poll_ready_with_permit(
&self,
cx: &mut Context<'_>,
permit: &mut S::Permit,
) -> Poll<Result<(), ClosedError>> {
self.inner.semaphore.poll_acquire(cx, permit)
}

/// Send a message and notify the receiver.
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, &mut self.permit)
}

pub(crate) fn try_send_with_permit(
&self,
value: T,
permit: &mut S::Permit,
) -> Result<(), (T, TrySendError)> {
self.inner.try_send(value, permit)
}
}

impl<T> Tx<T, AtomicUsize> {
Expand Down