Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retries #49

Merged
merged 29 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1fc3c1d
Extract ElapsedTime so that it can be reused
rucek Nov 16, 2023
3cc1eba
Remove old retries
rucek Nov 16, 2023
cb5792e
Basic retry for function/Try/Either
rucek Nov 16, 2023
0469fda
Delays and simple backoff
rucek Nov 16, 2023
d61e292
Initial isWorthRetrying for Either
rucek Nov 16, 2023
4c267d5
Add upper bound for delay when using backoff
rucek Nov 21, 2023
5b430e7
Add jitter
rucek Nov 22, 2023
52bd207
Remove unnecessary toList
rucek Nov 23, 2023
51fe027
Fail on unexpected match
rucek Nov 23, 2023
c364df0
Use System.nanoTime instead of System.currentTimeMillis for measuring…
rucek Nov 23, 2023
57f0b43
Fix formatting
rucek Nov 23, 2023
e7c5d7d
Add retries with unlimited attempts, make the retry logic tail-recursive
rucek Nov 27, 2023
41fedb0
Use enum for jitter
rucek Nov 27, 2023
17cde01
Add isWorthRetrying for functions and Try
rucek Nov 27, 2023
98b9d17
Move RetryPolicy to a separate file
rucek Nov 27, 2023
2991dac
Reduce maximum backoff delay to 1 minute
rucek Nov 27, 2023
24925a5
Refactor RetryPolicy to include a schedule and a result policy
rucek Nov 30, 2023
57716f9
Rename Direct to Immediate
rucek Nov 30, 2023
67566e1
Update test name
rucek Nov 30, 2023
77091ad
Add custom conditions to fail-fast tests
rucek Nov 30, 2023
d11454f
Add docs for retries
rucek Nov 30, 2023
0caa72b
Add retries to main README
rucek Nov 30, 2023
01926c6
Update docs on ResultPolicy error type
rucek Nov 30, 2023
c148380
Remove DummyImplicit's as they are no longer needed
rucek Nov 30, 2023
ebc6cdf
Fix typo in retries readme
rucek Nov 30, 2023
d671f21
Scaladocs and cleanup
rucek Nov 30, 2023
b2cf1ee
Add ADR for retries
rucek Nov 30, 2023
62e40ef
Fix naming
rucek Dec 4, 2023
5510a98
Add syntax sugar for retries
rucek Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ There are some helper methods which might be useful when writing forked code:

* `forever { ... }` repeatedly evaluates the given code block forever
* `repeatWhile { ... }` repeatedly evaluates the given code block, as long as it returns `true`
* `retry(times, sleep) { ... }` retries the given block up to the given number of times
* `uninterruptible { ... }` evaluates the given code block making sure it can't be interrupted

## Syntax
Expand Down Expand Up @@ -634,6 +633,10 @@ Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) f
Channels are back-pressured, as the `.send` operation is blocking until there's a receiver thread available, or if
there's enough space in the buffer. The processing space is bound by the total size of channel buffers.

## Retries

The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts). See the [full docs](doc/retries.md) for details.

## Kafka sources & drains

Dependency:
Expand Down
21 changes: 0 additions & 21 deletions core/src/main/scala/ox/control.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ox

import scala.concurrent.duration.FiniteDuration

def forever(f: => Unit): Nothing =
while true do f
throw new RuntimeException("can't get here")
Expand All @@ -16,25 +14,6 @@ def repeatUntil(f: => Boolean): Unit =
var loop = true
while loop do loop = !f

// TODO: retry schedules
def retry[T](times: Int, sleep: FiniteDuration)(f: => T): T =
try f
catch
case e: Throwable =>
if times > 0
then
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
else throw e

def retryEither[E, T](times: Int, sleep: FiniteDuration)(f: => Either[E, T]): Either[E, T] =
f match
case r: Right[E, T] => r
case Left(_) if times > 0 =>
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
case l: Left[E, T] => l

def uninterruptible[T](f: => T): T =
scoped {
val t = fork(f)
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/ox/retry/Jitter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ox.retry

/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay.
*
* The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at
* the same time - which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to
* the delays, the retries become more evenly distributed over time.
*
* See the <a href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">AWS Architecture Blog article on backoff and
* jitter</a> for a more in-depth explanation.
*
* Depending on the algorithm, the jitter can affect the delay in different ways - see the concrete variants for more details.
*/
enum Jitter:
/** No jitter, i.e. the delay just uses an exponential backoff with no adjustments. */
case None

/** Full jitter, i.e. the delay is a random value between 0 and the calculated backoff delay. */
case Full

/** Equal jitter, i.e. the delay is half of the calculated backoff delay plus a random value between 0 and the other half. */
case Equal

/** Decorrelated jitter, i.e. the delay is a random value between the initial delay and the last delay multiplied by 3. */
case Decorrelated
37 changes: 37 additions & 0 deletions core/src/main/scala/ox/retry/ResultPolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package ox.retry

/** A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which allows
* for failing fast on certain errors).
*
* @param isSuccess
* A function that determines whether a non-erroneous result is considered successful. By default, every non-erroneous result is
* considered successful.
* @param isWorthRetrying
* A function that determines whether an error is worth retrying. By default, all errors are retried.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class ResultPolicy[E, T](isSuccess: T => Boolean = (_: T) => true, isWorthRetrying: E => Boolean = (_: E) => true)

object ResultPolicy:
/** A policy that considers every non-erroneous result successful and retries on any error. */
def default[E, T]: ResultPolicy[E, T] = ResultPolicy()

/** A policy that customizes when a non-erroneous result is considered successful, and retries all errors
*
* @param isSuccess
* A predicate that indicates whether a non-erroneous result is considered successful.
*/
def successfulWhen[E, T](isSuccess: T => Boolean): ResultPolicy[E, T] = ResultPolicy(isSuccess = isSuccess)

/** A policy that customizes which errors are retried, and considers every non-erroneous result successful
* @param isWorthRetrying
* A predicate that indicates whether an erroneous result should be retried..
*/
def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying)

/** A policy that considers every non-erroneous result successful and never retries any error, i.e. fails fast */
def neverRetry[E, T]: ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = _ => false)
106 changes: 106 additions & 0 deletions core/src/main/scala/ox/retry/RetryPolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package ox.retry

import scala.concurrent.duration.*

/** A policy that defines how to retry a failed operation.
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
* See [[Schedule]] for more details.
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class RetryPolicy[E, T](schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T])

object RetryPolicy:
/** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries))

/** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever)

/** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay))

/** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay))

/** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoff[E, T](
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoffForever[E, T](
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
119 changes: 119 additions & 0 deletions core/src/main/scala/ox/retry/Schedule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package ox.retry

import scala.concurrent.duration.*
import scala.util.Random

private[retry] sealed trait Schedule:
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration

object Schedule:

private[retry] sealed trait Finite extends Schedule:
def maxRetries: Int

private[retry] sealed trait Infinite extends Schedule

/** A schedule that retries up to a given number of times, with no delay between subsequent attempts.
*
* @param maxRetries
* The maximum number of retries.
*/
case class Immediate(maxRetries: Int) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero

object Immediate:
/** A schedule that retries indefinitely, with no delay between subsequent attempts. */
def forever: Infinite = ImmediateForever

private case object ImmediateForever extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero

/** A schedule that retries up to a given number of times, with a fixed delay between subsequent attempts.
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
case class Delay(maxRetries: Int, delay: FiniteDuration) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay

object Delay:
/** A schedule that retries indefinitely, with a fixed delay between subsequent attempts.
*
* @param delay
* The delay between subsequent attempts.
*/
def forever(delay: FiniteDuration): Infinite = DelayForever(delay)

case class DelayForever private[retry] (delay: FiniteDuration) extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay

/** A schedule that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts.
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
case class Backoff(
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration =
Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay)

object Backoff:
private[retry] def delay(attempt: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration): FiniteDuration =
// converting Duration <-> Long back and forth to avoid exceeding maximum duration
(initialDelay.toMillis * Math.pow(2, attempt)).toLong.min(maxDelay.toMillis).millis

private[retry] def nextDelay(
attempt: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration,
jitter: Jitter,
lastDelay: Option[FiniteDuration]
): FiniteDuration =
def backoffDelay = Backoff.delay(attempt, initialDelay, maxDelay)

jitter match
case Jitter.None => backoffDelay
case Jitter.Full => Random.between(0, backoffDelay.toMillis).millis
case Jitter.Equal =>
val backoff = backoffDelay.toMillis
(backoff / 2 + Random.between(0, backoff / 2)).millis
case Jitter.Decorrelated =>
val last = lastDelay.getOrElse(initialDelay).toMillis
Random.between(initialDelay.toMillis, last * 3).millis

/** A schedule that retries indefinitely, with an increasing delay (backoff) between subsequent attempts.
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial
* delay and capped at the given maximum delay.
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def forever(initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None): Infinite =
BackoffForever(initialDelay, maxDelay, jitter)

case class BackoffForever private[retry] (initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None)
extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration =
Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay)
Loading