Skip to content

Commit

Permalink
time: stream throttle (#1949)
Browse files Browse the repository at this point in the history
  • Loading branch information
vorot93 authored and carllerche committed Dec 14, 2019
1 parent d593c5b commit 4b85565
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 44 deletions.
5 changes: 5 additions & 0 deletions tokio/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ mod timeout;
#[doc(inline)]
pub use timeout::{timeout, timeout_at, Timeout, Elapsed};

cfg_stream! {
mod throttle;
pub use throttle::{throttle, Throttle};
}

mod wheel;

#[cfg(test)]
Expand Down
114 changes: 70 additions & 44 deletions tokio/src/time/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,62 @@ use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};

/// Slow down a stream by enforcing a delay between items.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Throttle<T> {
/// `None` when duration is zero.
delay: Option<Delay>,

/// Set to true when `delay` has returned ready, but `stream` hasn't.
has_delayed: bool,
use futures_core::Stream;
use pin_project_lite::pin_project;

/// The stream to throttle
stream: T,
/// Slow down a stream by enforcing a delay between items.
/// They will be produced not more often than the specified interval.
///
/// # Example
///
/// Create a throttled stream.
/// ```rust,norun
/// use futures::stream::StreamExt;
/// use std::time::Duration;
/// use tokio::time::throttle;
///
/// # async fn dox() {
/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
///
/// loop {
/// // The string will be produced at most every 2 seconds
/// println!("{:?}", item_stream.next().await);
/// }
/// # }
/// ```
pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
where
T: Stream,
{
let delay = if duration == Duration::from_millis(0) {
None
} else {
Some(Delay::new_timeout(Instant::now() + duration, duration))
};

Throttle {
delay,
duration,
has_delayed: true,
stream,
}
}

impl<T> Throttle<T> {
/// Slow down a stream by enforcing a delay between items.
pub fn new(stream: T, duration: Duration) -> Self {
let delay = if duration == Duration::from_millis(0) {
None
} else {
Some(Delay::new_timeout(Instant::now() + duration, duration))
};

Self {
delay,
has_delayed: true,
stream,
}
pin_project! {
/// Stream for the [`throttle`](throttle) function.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Throttle<T> {
// `None` when duration is zero.
delay: Option<Delay>,
duration: Duration,

// Set to true when `delay` has returned ready, but `stream` hasn't.
has_delayed: bool,

// The stream to throttle
#[pin]
stream: T,
}
}

Expand Down Expand Up @@ -68,29 +96,27 @@ impl<T: Stream> Stream for Throttle<T> {
type Item = T::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
if !self.has_delayed && self.delay.is_some() {
ready!(self
.as_mut()
.map_unchecked_mut(|me| me.delay.as_mut().unwrap())
.poll(cx));
self.as_mut().get_unchecked_mut().has_delayed = true;
}

let value = ready!(self
.as_mut()
.map_unchecked_mut(|me| &mut me.stream)
.poll_next(cx));
if !self.has_delayed && self.delay.is_some() {
ready!(Pin::new(self.as_mut()
.project().delay.as_mut().unwrap())
.poll(cx));
*self.as_mut().project().has_delayed = true;
}

if value.is_some() {
if let Some(ref mut delay) = self.as_mut().get_unchecked_mut().delay {
delay.reset_timeout();
}
let value = ready!(self
.as_mut()
.project().stream
.poll_next(cx));

self.as_mut().get_unchecked_mut().has_delayed = false;
if value.is_some() {
let dur = self.duration;
if let Some(ref mut delay) = self.as_mut().project().delay {
delay.reset(Instant::now() + dur);
}

Poll::Ready(value)
*self.as_mut().project().has_delayed = false;
}

Poll::Ready(value)
}
}
30 changes: 30 additions & 0 deletions tokio/tests/time_throttle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::time::{self, throttle};
use tokio_test::*;

use std::time::Duration;

#[tokio::test]
async fn usage() {
time::pause();

let mut stream = task::spawn(throttle(
Duration::from_millis(100),
futures::stream::repeat(()),
));

assert_ready!(stream.poll_next());
assert_pending!(stream.poll_next());

time::advance(Duration::from_millis(90)).await;

assert_pending!(stream.poll_next());

time::advance(Duration::from_millis(101)).await;

assert!(stream.is_woken());

assert_ready!(stream.poll_next());
}

0 comments on commit 4b85565

Please sign in to comment.