Skip to content

Commit

Permalink
Update with review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aturon authored and cramertj committed May 25, 2018
1 parent 108fc90 commit f57acc7
Show file tree
Hide file tree
Showing 38 changed files with 135 additions and 76 deletions.
2 changes: 1 addition & 1 deletion futures-core/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
///
/// This macro bakes in propagation of `Pending` signals by returning early.
#[macro_export]
macro_rules! try_ready {
macro_rules! ready {
($e:expr) => (match $e {
$crate::Poll::Ready(t) => t,
$crate::Poll::Pending => return $crate::Poll::Pending,
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/flatten_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<F> Sink for FlattenSink<F> where F: Future, <F as Future>::Item: Sink<SinkE

fn poll_close(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
if let State::Ready(ref mut s) = self.st {
try_ready!(s.poll_close(cx));
ready!(s.poll_close(cx));
}
self.st = State::Closed;
return Ok(Async::Ready(()));
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<A> Stream for Lines<A>
type Error = io::Error;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<String>, io::Error> {
let n = try_ready!(self.io.read_line(&mut self.line));
let n = ready!(self.io.read_line(&mut self.line));
if n == 0 && self.line.len() == 0 {
return Ok(None.into())
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/read_until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<A> Future for ReadUntil<A>
// and just return it, as we are finished.
// If we hit "would block" then all the read data so far
// is in our buffer, and otherwise we propagate errors.
try_ready!(a.read_until(byte, buf));
ready!(a.read_until(byte, buf));
},
State::Empty => panic!("poll ReadUntil after it's done"),
}
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/sink/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl<S: Sink> Buffer<S> {
}

fn try_empty_buffer(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
try_ready!(self.sink.poll_ready(cx));
ready!(self.sink.poll_ready(cx));
while let Some(item) = self.buf.pop_front() {
self.sink.start_send(item)?;
if self.buf.len() != 0 {
try_ready!(self.sink.poll_ready(cx));
ready!(self.sink.poll_ready(cx));
}
}
Ok(Async::Ready(()))
Expand Down Expand Up @@ -93,13 +93,13 @@ impl<S: Sink> Sink for Buffer<S> {
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.try_empty_buffer(cx));
ready!(self.try_empty_buffer(cx));
debug_assert!(self.buf.is_empty());
self.sink.poll_flush(cx)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.try_empty_buffer(cx));
ready!(self.try_empty_buffer(cx));
debug_assert!(self.buf.is_empty());
self.sink.poll_close(cx)
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/sink/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<S: Sink> Future for Send<S> {

// we're done sending the item, but want to block on flushing the
// sink
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));

// now everything's emptied, so return the sink for further use
Ok(Async::Ready(self.take_sink()))
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ impl<T, U> Future for SendAll<T, U>
// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if let Some(item) = self.buffered.take() {
try_ready!(self.try_start_send(cx, item))
ready!(self.try_start_send(cx, item))
}

loop {
match self.stream_mut().poll_next(cx)? {
Async::Ready(Some(item)) => try_ready!(self.try_start_send(cx, item)),
Async::Ready(Some(item)) => ready!(self.try_start_send(cx, item)),
Async::Ready(None) => {
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));
return Ok(Async::Ready(self.take_result()))
}
Async::Pending => {
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));
return Ok(Async::Pending)
}
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/sink/with.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ impl<S, U, Fut, F> Sink for With<S, U, Fut, F>
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.poll(cx));
ready!(self.poll(cx));
self.sink.poll_flush(cx).map_err(Into::into)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.poll(cx));
ready!(self.poll(cx));
self.sink.poll_close(cx).map_err(Into::into)
}
}
2 changes: 1 addition & 1 deletion futures-util/src/sink/with_flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
}
}
if let Some(mut stream) = self.stream.take() {
while let Some(x) = try_ready!(stream.poll_next(cx)) {
while let Some(x) = ready!(stream.poll_next(cx)) {
match self.sink.poll_ready(cx)? {
Async::Ready(()) => self.sink.start_send(x)?,
Async::Pending => {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/buffer_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<S> Stream for BufferUnordered<S>
}

// Try polling a new future
if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
if let Some(val) = ready!(self.queue.poll_next(cx)) {
return Ok(Async::Ready(Some(val)));
}

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<S> Stream for Buffered<S>
}

// Try polling a new future
if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
if let Some(val) = ready!(self.queue.poll_next(cx)) {
return Ok(Async::Ready(Some(val)));
}

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<S1, S2> Stream for Chain<S1, S2>

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
if let Some(first) = self.first().as_pin_mut() {
if let Some(item) = try_ready!(first.poll_next(cx)) {
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item))
}
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::mem::{self, PinMut};
use std::marker::Unpin;
use std::prelude::v1::*;

use futures_core::{Poll, Stream};
Expand Down Expand Up @@ -76,6 +77,8 @@ impl<S> Chunks<S> where S: Stream {
unsafe_pinned!(stream -> Fuse<S>);
}

unsafe impl<S: Unpin + Stream> Unpin for Chunks<S> {}

impl<S> Stream for Chunks<S>
where S: Stream
{
Expand All @@ -84,7 +87,7 @@ impl<S> Stream for Chunks<S>
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let cap = self.items.capacity();
loop {
match try_ready!(self.stream().poll_next(cx)) {
match ready!(self.stream().poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/stream/collect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::prelude::v1::*;

use std::marker::Unpin;
use std::mem::{self, PinMut};

use futures_core::{Future, Poll, Stream};
Expand Down Expand Up @@ -33,14 +33,16 @@ impl<S: Stream, C: Default> Collect<S, C> {
unsafe_unpinned!(items -> C);
}

unsafe impl<S: Unpin + Stream, C> Unpin for Collect<S, C> {}

impl<S, C> Future for Collect<S, C>
where S: Stream, C: Default + Extend<S:: Item>
{
type Output = C;

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<C> {
loop {
match try_ready!(self.stream().poll_next(cx)) {
match ready!(self.stream().poll_next(cx)) {
Some(e) => self.items().extend(Some(e)),
None => return Poll::Ready(self.finish()),
}
Expand Down
3 changes: 3 additions & 0 deletions futures-util/src/stream/concat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::mem::PinMut;
use core::fmt::{Debug, Formatter, Result as FmtResult};
use core::default::Default;
use core::marker::Unpin;

use futures_core::{Future, Poll, Stream};
use futures_core::task;
Expand Down Expand Up @@ -41,6 +42,8 @@ impl<S: Stream> Concat<S> {
unsafe_unpinned!(accum -> Option<S::Item>);
}

unsafe impl<S: Stream + Unpin> Unpin for Concat<S> {}

impl<S> Future for Concat<S>
where S: Stream,
S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
Expand Down
4 changes: 3 additions & 1 deletion futures-util/src/stream/empty.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::mem::PinMut;
use core::marker::PhantomData;
use core::marker::{Unpin, PhantomData};

use futures_core::{Stream, Poll};
use futures_core::task;
Expand All @@ -22,6 +22,8 @@ pub fn empty<T>() -> Empty<T> {
}
}

unsafe impl<T> Unpin for Empty<T> {}

impl<T> Stream for Empty<T> {
type Item = T;

Expand Down
11 changes: 9 additions & 2 deletions futures-util/src/stream/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::mem::PinMut;
use core::marker::Unpin;

use {PinMutExt, OptionExt};

Expand Down Expand Up @@ -69,6 +70,12 @@ impl<S, R, P> Filter<S, R, P>
unsafe_unpinned!(pending_item -> Option<S::Item>);
}

unsafe impl<S, R, P> Unpin for Filter<S, R, P>
where S: Stream + Unpin,
P: FnMut(&S::Item) -> R,
R: Future<Output = bool> + Unpin,
{}

/* TODO
// Forwarding impl of Sink from the underlying stream
impl<S, R, P> Sink for Filter<S, R, P>
Expand All @@ -94,7 +101,7 @@ impl<S, R, P> Stream for Filter<S, R, P>
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<S::Item>> {
loop {
if self.pending_fut().as_pin_mut().is_none() {
let item = match try_ready!(self.stream().poll_next(cx)) {
let item = match ready!(self.stream().poll_next(cx)) {
Some(e) => e,
None => return Poll::Ready(None),
};
Expand All @@ -103,7 +110,7 @@ impl<S, R, P> Stream for Filter<S, R, P>
*self.pending_item() = Some(item);
}

let yield_item = try_ready!(self.pending_fut().as_pin_mut().unwrap().poll(cx));
let yield_item = ready!(self.pending_fut().as_pin_mut().unwrap().poll(cx));
self.pending_fut().assign(None);
let item = self.pending_item().take().unwrap();

Expand Down
11 changes: 9 additions & 2 deletions futures-util/src/stream/filter_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::mem::PinMut;
use core::marker::Unpin;

use {PinMutExt, OptionExt};

Expand Down Expand Up @@ -66,6 +67,12 @@ impl<S, R, F> FilterMap<S, R, F>
unsafe_pinned!(pending -> Option<R>);
}

unsafe impl<S, R, F> Unpin for FilterMap<S, R, F>
where S: Stream + Unpin,
F: FnMut(S::Item) -> R,
R: Future + Unpin,
{}

/* TODO
// Forwarding impl of Sink from the underlying stream
impl<S, R, F> Sink for FilterMap<S, R, F>
Expand All @@ -90,15 +97,15 @@ impl<S, R, F, B> Stream for FilterMap<S, R, F>
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<B>> {
loop {
if self.pending().as_pin_mut().is_none() {
let item = match try_ready!(self.stream().poll_next(cx)) {
let item = match ready!(self.stream().poll_next(cx)) {
Some(e) => e,
None => return Poll::Ready(None),
};
let fut = (self.f())(item);
self.pending().assign(Some(fut));
}

let item = try_ready!(self.pending().as_pin_mut().unwrap().poll(cx));
let item = ready!(self.pending().as_pin_mut().unwrap().poll(cx));
self.pending().assign(None);
if item.is_some() {
return Poll::Ready(item);
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ impl<S> Stream for Flatten<S>
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
loop {
if self.next().as_pin_mut().is_none() {
match try_ready!(self.stream().poll_next(cx)) {
match ready!(self.stream().poll_next(cx)) {
Some(e) => self.next().assign(Some(e)),
None => return Poll::Ready(None),
}
}
let item = try_ready!(self.next().as_pin_mut().unwrap().poll_next(cx));
let item = ready!(self.next().as_pin_mut().unwrap().poll_next(cx));
if item.is_some() {
return Poll::Ready(item);
} else {
Expand Down
29 changes: 17 additions & 12 deletions futures-util/src/stream/fold.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::mem::PinMut;
use core::marker::Unpin;

use {PinMutExt, OptionExt};

Expand Down Expand Up @@ -37,6 +38,8 @@ impl<S, Fut, T, F> Fold<S, Fut, T, F> {
unsafe_pinned!(fut -> Option<Fut>);
}

unsafe impl<S: Unpin, Fut: Unpin, T, F> Unpin for Fold<S, Fut, T, F> {}

impl<S, Fut, T, F> Future for Fold<S, Fut, T, F>
where S: Stream,
F: FnMut(T, S::Item) -> Fut,
Expand All @@ -46,21 +49,23 @@ impl<S, Fut, T, F> Future for Fold<S, Fut, T, F>

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<T> {
loop {
if self.accum().is_some() {
let item = try_ready!(self.stream().poll_next(cx));
let accum = self.accum().take().unwrap();

if let Some(e) = item {
let fut = (self.f())(accum, e);
self.fut().assign(Some(fut));
} else {
return Poll::Ready(accum)
}
} else {
let accum = try_ready!(self.fut().as_pin_mut().unwrap().poll(cx));
// we're currently processing a future to produce a new accum value
if self.accum().is_none() {
let accum = ready!(self.fut().as_pin_mut().unwrap().poll(cx));
*self.accum() = Some(accum);
self.fut().assign(None);
}

let item = ready!(self.stream().poll_next(cx));
let accum = self.accum().take()
.expect("Fold polled after completion");

if let Some(e) = item {
let fut = (self.f())(accum, e);
self.fut().assign(Some(fut));
} else {
return Poll::Ready(accum)
}
}
}
}
7 changes: 5 additions & 2 deletions futures-util/src/stream/for_each.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::mem::PinMut;
use core::marker::Unpin;

use {PinMutExt, OptionExt};

Expand Down Expand Up @@ -35,6 +36,8 @@ impl<S, U, F> ForEach<S, U, F> {
unsafe_pinned!(fut -> Option<U>);
}

unsafe impl<S, U, F> Unpin for ForEach<S, U, F> {}

impl<S, U, F> Future for ForEach<S, U, F>
where S: Stream,
F: FnMut(S::Item) -> U,
Expand All @@ -45,11 +48,11 @@ impl<S, U, F> Future for ForEach<S, U, F>
fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<()> {
loop {
if let Some(fut) = self.fut().as_pin_mut() {
try_ready!(fut.poll(cx));
ready!(fut.poll(cx));
}
self.fut().assign(None);

match try_ready!(self.stream().poll_next(cx)) {
match ready!(self.stream().poll_next(cx)) {
Some(e) => {
let fut = (self.f())(e);
self.fut().assign(Some(fut));
Expand Down
Loading

0 comments on commit f57acc7

Please sign in to comment.