Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring stream combinators over to 0.3 #1022

Merged
merged 2 commits into from
May 25, 2018

Conversation

aturon
Copy link
Member

@aturon aturon commented May 22, 2018

As with the future combinators, this PR pushes through all the simple combinators, to explore patterns. I'll do follow-up PRs for the more complicated cases.

Here I've developed a new set of macros for easily adding accessor methods that work on PinMut<Self>, similar to @withoutboats's derive. For flat structs, this works very well, and safety is trivial to audit.

@aturon aturon requested a review from cramertj May 22, 2018 19:19
///
/// This macro bakes in propagation of `Pending` signals by returning early.
#[macro_export]
macro_rules! try_ready {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh-- I had added this as ready! in my Sink CL, with try_ready! being the one that included error handling. We definitely want both since we can't use ? anymore-- do you have a naming preference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch to ready

fn as_pin_mut(self) -> Option<PinMut<'a, T>>;
}

impl<'a, T> OptionExt<'a, T> for PinMut<'a, Option<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: add to core (both this and assign)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll rename to set to match your PR

@@ -73,54 +71,41 @@ impl<S> Chunks<S> where S: Stream {
pub fn into_inner(self) -> S {
self.stream.into_inner()
}

unsafe_unpinned!(items -> Vec<S::Item>);
unsafe_pinned!(stream -> Fuse<S>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an unsafe impl<S: Unpin> Unpin for Chunks<S> {} so that the Unpin-edness won't depend on the types of the items?

}

unsafe_pinned!(stream -> S);
unsafe_unpinned!(items -> C);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, C> Unpin for Collect<S, C> {}

// These methods are the only points of access going through `PinMut`
impl<S: Stream> Concat<S> {
unsafe_pinned!(stream -> S);
unsafe_unpinned!(accum -> Option<S::Item>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin> Unpin for Concat<S> {}

}
} else {
let accum = try_ready!(self.fut().as_pin_mut().unwrap().poll(cx));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this unwrap into expect("Fold polled after completion")?

unsafe_pinned!(stream -> S);
unsafe_unpinned!(f -> F);
unsafe_pinned!(fut -> Option<U>);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, U: Unpin, F> Unpin for ForEach<S, U, F> {}

@@ -14,6 +15,7 @@ pub struct Fuse<S> {
done: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not new to this CL, but can you change this to use an Option (or create an issue to follow up and assign me)? That should allow for a more optimized representation, in addition to allowing the stream to drop before the Fuse is ready to be dropped.

@@ -45,8 +46,12 @@ impl<S: Stream, F> Inspect<S, F> {
pub fn into_inner(self) -> S {
self.stream
}

unsafe_pinned!(stream -> S);
unsafe_unpinned!(inspect -> F);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Stream + Unpin, F> Unpin for Inspect<S, F> {}

@@ -46,8 +47,12 @@ impl<S, F> Map<S, F> {
pub fn into_inner(self) -> S {
self.stream
}

unsafe_pinned!(stream -> S);
unsafe_unpinned!(f -> F);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, F> Unpin for Map<S, F> {}

@cramertj
Copy link
Member

cramertj commented May 22, 2018

Can you also update StreamExt::next?

unsafe_pinned!(stream1 -> Fuse<S1>);
unsafe_pinned!(stream2 -> Fuse<S2>);
unsafe_unpinned!(queued1 -> Option<S1::Item>);
unsafe_unpinned!(queued2 -> Option<S2::Item>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S1: Unpin, S2: Unpin> Unpin for Zip<S1, S2> {}

impl<T, F, Fut> Unfold<T, F, Fut> {
unsafe_unpinned!(f -> F);
unsafe_unpinned!(state -> Option<T>);
unsafe_pinned!(fut -> Option<Fut>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<T, F, Fut: Unpin> Unpin for Unfold<T, F, Fut> {}

@@ -55,11 +57,12 @@ use futures_core::task;
/// ```
pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Copy link
Member

@cramertj cramertj May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that borrowed futures are the way of the world, can we change this to use FnMut(&mut T) -> Fut where Fut yields an Option<It>? This matches the itertools::unfold method: https://docs.rs/itertools/0.7.8/itertools/fn.unfold.html

Edit: I'm not sure if that's actually possible to write-- definitely need to sort that out: for<'a> FnMut(&'a mut T) -> Fut::<'a> or something?

Edit2: we might be able to do it with a custom trait:

trait UnfoldFunc<'a, T, Res> {
    type Fut: Future<Output = Option<Res>> + 'a;
    fn next(&'a mut self, state: &'a mut T) -> Fut;
}

and then bound by F: for<'a> UnfoldFunc<'a, T, Res>. I'm not sure how to get <F as UnfoldFunc<'a, T, Res>>::Fut stored in the same place as a <F as UnfoldFunc<'b, T, Res>>::Fut used to be -- seems like we'd need manual erasure of some kind. Maybe it's enough to know that it works for any given lifetime, so we know it must work for a particular lifetime of our choosing that we keep the same across all the invocations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this nonsense work: cramertj@4bc297b

impl<S, U, F> Then<S, U, F> {
unsafe_pinned!(stream -> S);
unsafe_pinned!(future -> Option<U>);
unsafe_unpinned!(f -> F);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, U: Unpin, F> Unpin for Then<S, U, F> {}

unsafe_unpinned!(pred -> P);
unsafe_pinned!(pending_fut -> Option<R>);
unsafe_unpinned!(pending_item -> Option<S::Item>);
unsafe_unpinned!(done_taking -> bool);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, R: Unpin, P> Unpin for TakeWhile<S, R, P> {}

unsafe_unpinned!(pred -> P);
unsafe_pinned!(pending_fut -> Option<R>);
unsafe_unpinned!(pending_item -> Option<S::Item>);
unsafe_unpinned!(done_skipping -> bool);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin, R: Unpin, P> Unpin for SkipWhile<S, R, P> {}

where T: Clone
{
item: T,
error: marker::PhantomData<E>,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<T> Unpin for Repeat<T> {}

}
}
}

unsafe_pinned!(stream -> Fuse<S>);
unsafe_unpinned!(peeked -> Option<S::Item>);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe impl<S: Unpin> Unpin for Peakable<S> {}

@MajorBreakfast
Copy link
Contributor

@cramertj @aturon

Out of interest: In the futures RFC thread, your most recent approach is the impl Future for PinMut<..>approach. However, here i see many fn poll(mut self: PinMut<Self>, ..). Is this PR's implementation a stopgap solution/experiment? I see that you now have two branches: "0.3" and "0.3-PinFuture" Are you pursuing both approaches at the same time for the time being or did you run into a problem with the other approach?

@aturon
Copy link
Member Author

aturon commented May 24, 2018

@cramertj most review comments addressed. I left Fuse as-is, because a bunch of stuff depends on being able to extract out the underlying stream.

@MajorBreakfast we've been pursuing both in parallel, and yes, there are some blockers on the other side. I plan to update the RFC thread soon...

@cramertj
Copy link
Member

:shipit:

@cramertj cramertj merged commit f57acc7 into rust-lang:0.3 May 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants