Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possiblity to add suppressed application errors to ErrorMode #101

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/ox/ErrorMode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ trait ErrorMode[E, F[_]] {
/** Adds a suppressed exception to the value being represented by `error`. This is only called if `isError(error)` returns `true`. By
* default, the suppressed exception is discarded and the original value is returned.
*/
def addSuppressed[T](error: F[T], e: Throwable): F[T] = error
def addSuppressedException[T](error: F[T], e: Throwable): F[T] = error

/** Adds a suppressed application error to the value being represented by `error`. This is only called if `isError(error)` returns `true`.
* By default, the suppressed application error is discarded and the original value is returned.
*/
def addSuppressedError[T](error: F[T], e: E): F[T] = error
}

/** An error mode which doesn't allow reporting application errors.
Expand Down Expand Up @@ -67,3 +72,7 @@ class UnionMode[E: ClassTag] extends ErrorMode[E, [T] =>> E | T] {
override def pure[T](t: T): E | T = t
override def pureError[T](e: E): E | T = e
}

//

case class SecondaryApplicationError[E](e: E) extends Throwable("Secondary application error reported")
11 changes: 8 additions & 3 deletions core/src/main/scala/ox/race.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@ def race[E, F[_], T](em: ErrorMode[E, F])(fs: Seq[() => F[T]]): F[T] =
@tailrec
def takeUntilSuccess(failures: Vector[Either[E, Throwable]], left: Int): F[T] =
if left == 0 then
val otherExceptions = failures.tail.collect { case Right(e) => e }
failures.headOption.getOrElse(throw new NoSuchElementException) match
case Left(appError) =>
otherExceptions.foldLeft(em.pureError(appError))(em.addSuppressed)
failures.tail.foldLeft(em.pureError(appError)) {
case (acc, Left(e)) => em.addSuppressedError(acc, e)
case (acc, Right(e)) => em.addSuppressedException(acc, e)
}
case Right(e) =>
otherExceptions.foreach(ee => if e != ee then e.addSuppressed(ee))
failures.tail.foreach {
case Left(ee) => e.addSuppressed(SecondaryApplicationError(ee))
case Right(ee) => if e != ee then e.addSuppressed(ee)
}
throw e
else
result.take() match
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/ox/supervised.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: OxError[E, F] ?=> F[T]):
// all forks are guaranteed to have finished; some might have ended up throwing exceptions (InterruptedException or
// others), but if the scope ended because of an application error, only that result will be returned, hence we have
// to add the other exceptions as suppressed
if em.isError(scopeResult) then s.addExceptionsAsSuppressed(scopeResult, em) else scopeResult
if em.isError(scopeResult) then s.addSuppressedErrors(scopeResult, em) else scopeResult
catch
case e: Throwable =>
// all forks are guaranteed to have finished: some might have ended up throwing exceptions (InterruptedException or
// others), but only the first one is propagated below. That's why we add all the other exceptions as suppressed.
s.addOtherExceptionsAsSuppressedTo(e)
s.addSuppressedErrors(e)
throw e

private[ox] sealed trait Supervisor[-E]:
Expand All @@ -79,6 +79,7 @@ private[ox] class DefaultSupervisor[E] extends Supervisor[E]:
// the result might be completed with: a success marker, an app error, or an exception
private val result: CompletableFuture[ErrorModeSupervisorResult | E] = new CompletableFuture()
private val otherExceptions: java.util.Set[Throwable] = ConcurrentHashMap.newKeySet()
private val otherErrors: java.util.Set[E] = ConcurrentHashMap.newKeySet()

override def forkStarts(): Unit = running.incrementAndGet()

Expand All @@ -88,7 +89,7 @@ private[ox] class DefaultSupervisor[E] extends Supervisor[E]:

override def forkException(e: Throwable): Unit = if !result.completeExceptionally(e) then otherExceptions.add(e)

override def forkAppError(e: E): Unit = if !result.complete(e) then otherExceptions.add(SecondaryApplicationError(e))
override def forkAppError(e: E): Unit = if !result.complete(e) then otherErrors.add(e)

/** Wait until the count of all supervised, user forks that are running reaches 0, or until any supervised fork fails with an exception.
*
Expand All @@ -98,16 +99,16 @@ private[ox] class DefaultSupervisor[E] extends Supervisor[E]:
*/
def join(): ErrorModeSupervisorResult | E = unwrapExecutionException(result.get())

def addOtherExceptionsAsSuppressedTo(e: Throwable): Throwable =
def addSuppressedErrors(e: Throwable): Throwable =
otherExceptions.forEach(e2 => if e != e2 then e.addSuppressed(e2))
otherErrors.forEach(e2 => e.addSuppressed(SecondaryApplicationError(e2)))
e

def addExceptionsAsSuppressed[F[_], T](r: F[T], errorMode: ErrorMode[E, F]): F[T] =
def addSuppressedErrors[F[_], T](r: F[T], errorMode: ErrorMode[E, F]): F[T] =
var result = r
otherExceptions.forEach(e => result = errorMode.addSuppressed(result, e))
otherExceptions.forEach(e => result = errorMode.addSuppressedException(result, e))
otherErrors.forEach(e => result = errorMode.addSuppressedError(result, e))
result

private[ox] enum ErrorModeSupervisorResult:
case Success

case class SecondaryApplicationError[E](e: E) extends Throwable("Secondary application error reported to the supervisor")
Loading