diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b458d46074..f75394221c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3426,6 +3426,7 @@ object Stream extends StreamLowPriority { private[fs2] final class PartiallyAppliedFromIterator[F[_]]( private val dummy: Boolean ) extends AnyVal { + def apply[A](iterator: Iterator[A])(implicit F: Sync[F]): Stream[F, A] = { def getNext(i: Iterator[A]): F[Option[(A, Iterator[A])]] = F.delay(i.hasNext).flatMap { b => @@ -3434,6 +3435,17 @@ object Stream extends StreamLowPriority { Stream.unfoldEval(iterator)(getNext) } + + def apply[A](iterator: Iterator[A], chunkSize: Int)(implicit F: Sync[F]): Stream[F, A] = { + def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = + F.delay { + for (_ <- 1 to chunkSize if i.hasNext) yield i.next() + }.map { s => + if (s.isEmpty) None else Some((Chunk.seq(s), i)) + } + + Stream.unfoldChunkEval(iterator)(getNextChunk) + } } /** @@ -3445,6 +3457,7 @@ object Stream extends StreamLowPriority { private[fs2] final class PartiallyAppliedFromBlockingIterator[F[_]]( private val dummy: Boolean ) extends AnyVal { + def apply[A]( blocker: Blocker, iterator: Iterator[A] @@ -3456,6 +3469,23 @@ object Stream extends StreamLowPriority { Stream.unfoldEval(iterator)(getNext) } + + def apply[A]( + blocker: Blocker, + iterator: Iterator[A], + chunkSize: Int + )(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, A] = { + def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = + blocker + .delay { + for (_ <- 1 to chunkSize if i.hasNext) yield i.next() + } + .map { s => + if (s.isEmpty) None else Some((Chunk.seq(s), i)) + } + + Stream.unfoldChunkEval(iterator)(getNextChunk) + } } /** diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 21c643f7f5..b1fd9db02d 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -651,23 +651,53 @@ class StreamCombinatorsSuite extends Fs2Suite { } } - test("fromIterator") { - forAllF { (x: List[Int]) => - Stream - .fromIterator[IO](x.iterator) - .compile - .toList - .map(it => assert(it == x)) + group("fromIterator") { + test("single") { + forAllF { (x: List[Int]) => + Stream + .fromIterator[IO](x.iterator) + .compile + .toList + .map(it => assert(it == x)) + } + } + + test("chunked") { + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 + Stream + .fromIterator[IO](x.iterator, chunkSize) + .compile + .toList + .map(it => assert(it == x)) + } } } - test("fromBlockingIterator") { - forAllF { (x: List[Int]) => - Stream - .fromBlockingIterator[IO](Blocker.liftExecutionContext(munitExecutionContext), x.iterator) - .compile - .toList - .map(it => assert(it == x)) + group("fromBlockingIterator") { + test("single") { + forAllF { (x: List[Int]) => + Stream + .fromBlockingIterator[IO](Blocker.liftExecutionContext(munitExecutionContext), x.iterator) + .compile + .toList + .map(it => assert(it == x)) + } + } + + test("chunked") { + forAllF { (x: List[Int], cs: Int) => + val chunkSize = (cs % 4096).abs + 1 + Stream + .fromBlockingIterator[IO]( + Blocker.liftExecutionContext(munitExecutionContext), + x.iterator, + chunkSize + ) + .compile + .toList + .map(it => assert(it == x)) + } } }