From 12a4eeb0c2144bcc1e08e7783bb283b6fd9aa3ac Mon Sep 17 00:00:00 2001 From: Robert Marek Date: Fri, 28 Aug 2020 19:04:21 +0200 Subject: [PATCH 1/2] Create chunks in Stream.fromIterator (#2010) --- core/shared/src/main/scala/fs2/Stream.scala | 29 ++++++++++++------- .../scala/fs2/StreamCombinatorsSuite.scala | 14 ++++++--- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b458d46074..40805cf558 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3426,13 +3426,15 @@ object Stream extends StreamLowPriority { private[fs2] final class PartiallyAppliedFromIterator[F[_]]( private val dummy: Boolean ) extends AnyVal { - def apply[A](iterator: Iterator[A])(implicit F: Sync[F]): Stream[F, A] = { - def getNext(i: Iterator[A]): F[Option[(A, Iterator[A])]] = - F.delay(i.hasNext).flatMap { b => - if (b) F.delay(i.next()).map(a => (a, i).some) else F.pure(None) + def apply[A](iterator: Iterator[A], chunkSize: Int = 1)(implicit F: Sync[F]): Stream[F, A] = { + def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = + F.delay { + for (_ <- 1 to chunkSize if i.hasNext) yield i.next() + }.map { s => + if (s.isEmpty) None else Some((Chunk.seq(s), i)) } - Stream.unfoldEval(iterator)(getNext) + Stream.unfoldChunkEval(iterator)(getNextChunk) } } @@ -3447,14 +3449,19 @@ object Stream extends StreamLowPriority { ) extends AnyVal { def apply[A]( blocker: Blocker, - iterator: Iterator[A] + iterator: Iterator[A], + chunkSize: Int = 1 )(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, A] = { - def getNext(i: Iterator[A]): F[Option[(A, Iterator[A])]] = - blocker.delay(i.hasNext).flatMap { b => - if (b) blocker.delay(i.next()).map(a => (a, i).some) else F.pure(None) - } + def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = + blocker + .delay { + for (_ <- 1 to chunkSize if i.hasNext) yield i.next() + } + .map { s => + if (s.isEmpty) None else Some((Chunk.seq(s), i)) + } - Stream.unfoldEval(iterator)(getNext) + Stream.unfoldChunkEval(iterator)(getNextChunk) } } diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 21c643f7f5..7f4b47f4aa 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -652,9 +652,10 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("fromIterator") { - forAllF { (x: List[Int]) => + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 Stream - .fromIterator[IO](x.iterator) + .fromIterator[IO](x.iterator, chunkSize) .compile .toList .map(it => assert(it == x)) @@ -662,9 +663,14 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("fromBlockingIterator") { - forAllF { (x: List[Int]) => + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 Stream - .fromBlockingIterator[IO](Blocker.liftExecutionContext(munitExecutionContext), x.iterator) + .fromBlockingIterator[IO]( + Blocker.liftExecutionContext(munitExecutionContext), + x.iterator, + chunkSize + ) .compile .toList .map(it => assert(it == x)) From 702cca59c1c5d7de843ac24b9e87d5aee9212db6 Mon Sep 17 00:00:00 2001 From: Robert Marek Date: Fri, 28 Aug 2020 20:26:36 +0200 Subject: [PATCH 2/2] Make chunked version of fromIterator as overload instead of default value to ensure binary compatibility --- core/shared/src/main/scala/fs2/Stream.scala | 27 +++++++- .../scala/fs2/StreamCombinatorsSuite.scala | 64 +++++++++++++------ 2 files changed, 69 insertions(+), 22 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 40805cf558..f75394221c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3426,7 +3426,17 @@ object Stream extends StreamLowPriority { private[fs2] final class PartiallyAppliedFromIterator[F[_]]( private val dummy: Boolean ) extends AnyVal { - def apply[A](iterator: Iterator[A], chunkSize: Int = 1)(implicit F: Sync[F]): Stream[F, A] = { + + def apply[A](iterator: Iterator[A])(implicit F: Sync[F]): Stream[F, A] = { + def getNext(i: Iterator[A]): F[Option[(A, Iterator[A])]] = + F.delay(i.hasNext).flatMap { b => + if (b) F.delay(i.next()).map(a => (a, i).some) else F.pure(None) + } + + Stream.unfoldEval(iterator)(getNext) + } + + def apply[A](iterator: Iterator[A], chunkSize: Int)(implicit F: Sync[F]): Stream[F, A] = { def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = F.delay { for (_ <- 1 to chunkSize if i.hasNext) yield i.next() @@ -3447,10 +3457,23 @@ object Stream extends StreamLowPriority { private[fs2] final class PartiallyAppliedFromBlockingIterator[F[_]]( private val dummy: Boolean ) extends AnyVal { + + def apply[A]( + blocker: Blocker, + iterator: Iterator[A] + )(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, A] = { + def getNext(i: Iterator[A]): F[Option[(A, Iterator[A])]] = + blocker.delay(i.hasNext).flatMap { b => + if (b) blocker.delay(i.next()).map(a => (a, i).some) else F.pure(None) + } + + Stream.unfoldEval(iterator)(getNext) + } + def apply[A]( blocker: Blocker, iterator: Iterator[A], - chunkSize: Int = 1 + chunkSize: Int )(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, A] = { def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = blocker diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 7f4b47f4aa..b1fd9db02d 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -651,29 +651,53 @@ class StreamCombinatorsSuite extends Fs2Suite { } } - test("fromIterator") { - forAllF { (x: List[Int], cs: Int) => - val chunkSize = (cs % 4096).abs + 1 - Stream - .fromIterator[IO](x.iterator, chunkSize) - .compile - .toList - .map(it => assert(it == x)) + group("fromIterator") { + test("single") { + forAllF { (x: List[Int]) => + Stream + .fromIterator[IO](x.iterator) + .compile + .toList + .map(it => assert(it == x)) + } + } + + test("chunked") { + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 + Stream + .fromIterator[IO](x.iterator, chunkSize) + .compile + .toList + .map(it => assert(it == x)) + } } } - test("fromBlockingIterator") { - forAllF { (x: List[Int], cs: Int) => - val chunkSize = (cs % 4096).abs + 1 - Stream - .fromBlockingIterator[IO]( - Blocker.liftExecutionContext(munitExecutionContext), - x.iterator, - chunkSize - ) - .compile - .toList - .map(it => assert(it == x)) + group("fromBlockingIterator") { + test("single") { + forAllF { (x: List[Int]) => + Stream + .fromBlockingIterator[IO](Blocker.liftExecutionContext(munitExecutionContext), x.iterator) + .compile + .toList + .map(it => assert(it == x)) + } + } + + test("chunked") { + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 + Stream + .fromBlockingIterator[IO]( + Blocker.liftExecutionContext(munitExecutionContext), + x.iterator, + chunkSize + ) + .compile + .toList + .map(it => assert(it == x)) + } } }