Skip to content

Commit

Permalink
Add a Stream::zip method
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Aug 16, 2016
1 parent 992b7f7 commit b7b728c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod skip;
mod skip_while;
mod take;
mod then;
mod zip;
pub use self::and_then::AndThen;
pub use self::buffered::Buffered;
pub use self::collect::Collect;
Expand All @@ -54,6 +55,7 @@ pub use self::skip::Skip;
pub use self::skip_while::SkipWhile;
pub use self::take::Take;
pub use self::then::Then;
pub use self::zip::Zip;

mod impls;

Expand Down Expand Up @@ -652,6 +654,18 @@ pub trait Stream: 'static {
{
merge::new(self, other)
}

/// An adapter for zipping two streams together.
///
/// The zipped stream waits for both streams to produce an item, and then
/// returns that pair. If an error happens that than error will be returned
/// immediately. If either stream ends then the zipped stream will also end.
fn zip<S>(self, other: S) -> Zip<Self, S>
where S: Stream<Error = Self::Error>,
Self: Sized,
{
zip::new(self, other)
}
}

/// A type alias for `Box<Future + Send>`
Expand Down
70 changes: 70 additions & 0 deletions src/stream/zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use {Task, Poll};
use stream::{Stream, Fuse};

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from one or both of the underlying
/// streams as they become available. Errors, however, are not merged: you
/// get at most one error at a time.
pub struct Zip<S1: Stream, S2: Stream> {
stream1: Fuse<S1>,
stream2: Fuse<S2>,
queued1: Option<S1::Item>,
queued2: Option<S2::Item>,
}

pub fn new<S1, S2>(stream1: S1, stream2: S2) -> Zip<S1, S2>
where S1: Stream, S2: Stream<Error = S1::Error>
{
Zip {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
queued1: None,
queued2: None,
}
}

impl<S1, S2> Stream for Zip<S1, S2>
where S1: Stream, S2: Stream<Error = S1::Error>
{
type Item = (S1::Item, S2::Item);
type Error = S1::Error;

fn poll(&mut self, task: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
if self.queued1.is_none() {
match self.stream1.poll(task) {
Poll::Err(e) => return Poll::Err(e),
Poll::NotReady => {}
Poll::Ok(Some(item1)) => self.queued1 = Some(item1),
Poll::Ok(None) => {}
}
}
if self.queued2.is_none() {
match self.stream2.poll(task) {
Poll::Err(e) => return Poll::Err(e),
Poll::NotReady => {}
Poll::Ok(Some(item2)) => self.queued2 = Some(item2),
Poll::Ok(None) => {}
}
}

if self.queued1.is_some() && self.queued2.is_some() {
let pair = (self.queued1.take().unwrap(),
self.queued2.take().unwrap());
Poll::Ok(Some(pair))
} else if self.stream1.is_done() || self.stream2.is_done() {
Poll::Ok(None)
} else {
Poll::NotReady
}
}

fn schedule(&mut self, task: &mut Task) {
if self.queued1.is_none() {
self.stream1.schedule(task);
}
if self.queued2.is_none() {
self.stream2.schedule(task);
}
}
}
13 changes: 13 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,16 @@ fn buffered() {
sassert_next(&mut rx, 3);
sassert_done(&mut rx);
}

#[test]
fn zip() {
assert_done(|| list().zip(list()).collect(),
Ok(vec![(1, 1), (2, 2), (3, 3)]));
assert_done(|| list().zip(list().take(2)).collect(),
Ok(vec![(1, 1), (2, 2)]));
assert_done(|| list().take(2).zip(list()).collect(),
Ok(vec![(1, 1), (2, 2)]));
assert_done(|| err_list().zip(list()).collect(), Err(3));
assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
Ok(vec![(1, 2), (2, 3), (3, 4)]));
}

0 comments on commit b7b728c

Please sign in to comment.