Skip to content

Commit

Permalink
Add short circuiting monad transformer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vasilmkd committed Aug 18, 2021
1 parent 4ef10df commit d84cd9a
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions core/shared/src/test/scala/fs2/StreamParJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ package fs2

import scala.concurrent.duration._

import cats.data.{EitherT, OptionT}
import cats.effect.IO
import cats.effect.kernel.{Deferred, Ref}
import cats.syntax.all._
import org.scalacheck.effect.PropF.forAllF

import scala.util.control.NoStackTrace

class StreamParJoinSuite extends Fs2Suite {
test("no concurrency") {
forAllF { (s: Stream[Pure, Int]) =>
Expand Down Expand Up @@ -204,4 +207,124 @@ class StreamParJoinSuite extends Fs2Suite {
.parJoinUnbounded ++ Stream.emit(1)).compile.drain
.intercept[Err]
}

group("short-circuiting transformers") {
test("do not block while evaluating a stream of streams in IO in parallel") {
def f(n: Int): Stream[IO, String] = Stream(n).map(_.toString)

Stream(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.map(_.toSet)
.flatMap { actual =>
IO(assertEquals(actual, Set("1", "2", "3")))
}
}

test(
"do not block while evaluating a stream of streams in EitherT[IO, Throwable, *] in parallel - right"
) {
def f(n: Int): Stream[EitherT[IO, Throwable, *], String] = Stream(n).map(_.toString)

Stream(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.map(_.toSet)
.value
.flatMap { actual =>
IO(assertEquals(actual, Right(Set("1", "2", "3"))))
}
}

test(
"do not block while evaluating a stream of streams in EitherT[IO, Throwable, *] in parallel - left"
) {
case object TestException extends Throwable with NoStackTrace

def f(n: Int): Stream[EitherT[IO, Throwable, *], String] =
if (n % 2 != 0) Stream(n).map(_.toString)
else Stream.eval[EitherT[IO, Throwable, *], String](EitherT.leftT(TestException))

Stream(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, Left(TestException)))
}
}

test("do not block while evaluating an EitherT.left outer stream") {
case object TestException extends Throwable with NoStackTrace

def f(n: Int): Stream[EitherT[IO, Throwable, *], String] = Stream(n).map(_.toString)

Stream
.eval[EitherT[IO, Throwable, *], Int](EitherT.leftT[IO, Int](TestException))
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, Left(TestException)))
}
}

test(
"do not block while evaluating a stream of streams in OptionT[IO, *] in parallel - some"
) {
def f(n: Int): Stream[OptionT[IO, *], String] = Stream(n).map(_.toString)

Stream(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.map(_.toSet)
.value
.flatMap { actual =>
IO(assertEquals(actual, Some(Set("1", "2", "3"))))
}
}

test(
"do not block while evaluating a stream of streams in OptionT[IO, *] in parallel - none"
) {
def f(n: Int): Stream[OptionT[IO, *], String] =
if (n % 2 != 0) Stream(n).map(_.toString)
else Stream.eval[OptionT[IO, *], String](OptionT.none)

Stream(1, 2, 3)
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, None))
}
}

test("do not block while evaluating an OptionT.none outer stream") {
def f(n: Int): Stream[OptionT[IO, *], String] = Stream(n).map(_.toString)

Stream
.eval[OptionT[IO, *], Int](OptionT.none[IO, Int])
.map(f)
.parJoinUnbounded
.compile
.toList
.value
.flatMap { actual =>
IO(assertEquals(actual, None))
}
}
}
}

0 comments on commit d84cd9a

Please sign in to comment.