Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed nontermination issue in par operators #2239

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,57 @@ trait GenSpawnInstances {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
.flatMap[A] {
case Outcome.Succeeded(fa) => fa
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
case Outcome.Errored(e) => F.raiseError(e)
case Outcome.Canceled() => poll(F.canceled *> F.never)
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
Expand All @@ -84,18 +123,57 @@ trait GenSpawnInstances {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb.value))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
.flatMap[A] {
case Outcome.Succeeded(fa) => fa
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
case Outcome.Errored(e) => F.raiseError(e)
case Outcome.Canceled() => poll(F.canceled *> F.never)
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
Expand Down
14 changes: 14 additions & 0 deletions laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class PureConcSpec extends Specification with Discipline with BaseSpec {
implicit def exec(fb: TimeT[PureConc[Int, *], Boolean]): Prop =
Prop(pure.run(TimeT.run(fb)).fold(false, _ => false, _.getOrElse(false)))

"parallel utilities" should {
import cats.effect.kernel.{GenConcurrent, Outcome}
import cats.effect.kernel.implicits._
import cats.syntax.all._

type F[A] = PureConc[Int, A]
val F = GenConcurrent[F]

"short-circuit on error" in {
pure.run((F.never[Unit], F.raiseError[Unit](42)).parTupled) mustEqual Outcome.Errored(42)
pure.run((F.raiseError[Unit](42), F.never[Unit]).parTupled) mustEqual Outcome.Errored(42)
}
}

checkAll(
"TimeT[PureConc]",
GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis)
Expand Down
31 changes: 20 additions & 11 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,26 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {

}

"parallel" should {
"run parallel actually in parallel" in real {
val x = IO.sleep(2.seconds) >> IO.pure(1)
val y = IO.sleep(2.seconds) >> IO.pure(2)

List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
IO {
res mustEqual List(1, 2)
}
}
}

"short-circuit on error" in ticked { implicit ticker =>
case object TestException extends RuntimeException

(IO.never[Unit], IO.raiseError[Unit](TestException)).parTupled.void must failAs(TestException)
(IO.raiseError[Unit](TestException), IO.never[Unit]).parTupled.void must failAs(TestException)
}
}

"miscellaneous" should {

"round trip non-canceled through s.c.Future" in ticked { implicit ticker =>
Expand All @@ -1081,17 +1101,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
test must completeAs(42)
}

"run parallel actually in parallel" in real {
val x = IO.sleep(2.seconds) >> IO.pure(1)
val y = IO.sleep(2.seconds) >> IO.pure(2)

List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
IO {
res mustEqual List(1, 2)
}
}
}

"run a synchronous IO" in ticked { implicit ticker =>
val ioa = IO(1).map(_ + 2)
val test = IO.fromFuture(IO(ioa.unsafeToFuture()))
Expand Down