Skip to content

Commit

Permalink
[2743] Migrate internal use of CircuitBreaker double to duration (#2748)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjmoore authored Sep 29, 2022
1 parent e5f42ec commit 0c2de60
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 61 deletions.
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,20 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Closed : arrow/fx/co

public final class arrow/fx/coroutines/CircuitBreaker$State$HalfOpen : arrow/fx/coroutines/CircuitBreaker$State {
public fun <init> (D)V
public synthetic fun <init> (JLkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun equals (Ljava/lang/Object;)Z
public final fun getResetTimeout-UwyO8pc ()J
public final fun getResetTimeoutNanos ()D
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coroutines/CircuitBreaker$State {
public fun <init> (JD)V
public synthetic fun <init> (JJLkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun equals (Ljava/lang/Object;)Z
public final fun getExpiresAt ()J
public final fun getResetTimeout-UwyO8pc ()J
public final fun getResetTimeoutNanos ()D
public final fun getStartedAt ()J
public fun hashCode ()I
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import arrow.fx.coroutines.CircuitBreaker.State.HalfOpen
import arrow.fx.coroutines.CircuitBreaker.State.Open
import kotlinx.coroutines.CompletableDeferred
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
import kotlin.time.ExperimentalTime

/**
* A [CircuitBreaker] is used to `protect` resources or services from being overloaded
Expand Down Expand Up @@ -129,15 +130,18 @@ public class CircuitBreaker
private constructor(
private val state: AtomicRef<State>,
private val maxFailures: Int,
private val resetTimeout: Double,
private val resetTimeoutNanos: Double,
private val exponentialBackoffFactor: Double,
private val maxResetTimeout: Double,
private val maxResetTimeoutNanos: Double,
private val onRejected: suspend () -> Unit,
private val onClosed: suspend () -> Unit,
private val onHalfOpen: suspend () -> Unit,
private val onOpen: suspend () -> Unit
) {


private val resetTimeout: Duration = resetTimeoutNanos.nanoseconds
private val maxResetTimeout: Duration = maxResetTimeoutNanos.nanoseconds

/** Returns the current [CircuitBreaker.State], meant for debugging purposes.
*/
public suspend fun state(): State = state.get()
Expand Down Expand Up @@ -191,10 +195,10 @@ private constructor(
// task to execute, while transitioning into HalfOpen
if (!state.compareAndSet(
curr,
State.HalfOpen(curr.resetTimeoutNanos, curr.awaitClose)
HalfOpen(curr.resetTimeout, curr.awaitClose)
)
) protectOrThrow(fa) // retry!
else attemptReset(fa, curr.resetTimeoutNanos, curr.awaitClose, curr.startedAt)
else attemptReset(fa, curr.resetTimeout, curr.awaitClose, curr.startedAt)
} else {
// Open isn't expired, so we need to fail
val expiresInMillis = curr.expiresAt - now
Expand Down Expand Up @@ -234,15 +238,15 @@ private constructor(
if (curr.failures + 1 < maxFailures) {
// It's fine, just increment the failures count
val update = Closed(curr.failures + 1)
if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry?
if (!state.compareAndSet(curr, update)) markOrResetFailures<A>(result) // retry?
else throw result.value
} else {
// N.B. this could be canceled, however we don't care
val now = timeInMillis()
// We've gone over the permitted failures threshold,
// so we need to open the circuit breaker
val update = Open(now, resetTimeout, CompletableDeferred())
if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry
if (!state.compareAndSet(curr, update)) markOrResetFailures<A>(result) // retry
else {
onOpen.invoke()
throw result.value
Expand All @@ -268,7 +272,7 @@ private constructor(
*/
private suspend fun <A> attemptReset(
task: suspend () -> A,
resetTimeout: Double,
resetTimeout: Duration,
awaitClose: CompletableDeferred<Unit>,
lastStartedAt: Long
): A =
Expand All @@ -292,8 +296,8 @@ private constructor(
}
is ExitCase.Failure -> {
// Failed reset, which means we go back in the Open state with new expiry val nextTimeout
val value: Double = (resetTimeout * exponentialBackoffFactor)
val nextTimeout: Double =
val value: Duration = (resetTimeout * exponentialBackoffFactor)
val nextTimeout: Duration =
if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout
else value
val ts = timeInMillis()
Expand Down Expand Up @@ -321,9 +325,9 @@ private constructor(
CircuitBreaker(
state = state,
maxFailures = maxFailures,
resetTimeout = resetTimeout,
resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS),
onRejected = suspend { onRejected.invoke(); callback.invoke() },
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand All @@ -347,9 +351,9 @@ private constructor(
CircuitBreaker(
state = state,
maxFailures = maxFailures,
resetTimeout = resetTimeout,
resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS),
onRejected = onRejected,
onClosed = suspend { onClosed.invoke(); callback.invoke(); },
onHalfOpen = onHalfOpen,
Expand All @@ -373,9 +377,9 @@ private constructor(
CircuitBreaker(
state = state,
maxFailures = maxFailures,
resetTimeout = resetTimeout,
resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS),
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = suspend { onHalfOpen.invoke(); callback.invoke() },
Expand All @@ -399,9 +403,9 @@ private constructor(
CircuitBreaker(
state = state,
maxFailures = maxFailures,
resetTimeout = resetTimeout,
resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS),
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand Down Expand Up @@ -447,20 +451,50 @@ private constructor(
* @param startedAt is the timestamp in milliseconds since the
* epoch when the transition to [Open] happened.
*
* @param resetTimeoutNanos is the current `resetTimeout` that is
* @param resetTimeout is the current `resetTimeout` that is
* applied to this `Open` state, to be multiplied by the
* exponential backoff factor for the next transition from
* `HalfOpen` to `Open`.
*/
public class Open internal constructor(
public val startedAt: Long,
public val resetTimeoutNanos: Double,
internal val awaitClose: CompletableDeferred<Unit>
public val resetTimeout: Duration,
internal val awaitClose: CompletableDeferred<Unit>,
) : State() {


@Deprecated(
"Prefer to use resetTimeout with kotlin.time.Duration",
ReplaceWith(
"resetTimeout.toDouble(DurationUnit.NANOSECONDS)",
"kotlin.time.DurationUnit"
)
)
public val resetTimeoutNanos: Double
get() = resetTimeout.toDouble(DurationUnit.NANOSECONDS)

public constructor(startedAt: Long, resetTimeout: Duration) : this(
startedAt,
resetTimeout,
CompletableDeferred()
)

@Deprecated(
"This constructor will be removed in Arrow 2.0",
level = DeprecationLevel.WARNING
)
internal constructor(
startedAt: Long,
resetTimeoutNanos: Double,
awaitClose: CompletableDeferred<Unit>,
) : this(startedAt, resetTimeoutNanos.nanoseconds, awaitClose)

@Deprecated(
"This constructor will be removed in Arrow 2.0",
level = DeprecationLevel.WARNING
)
public constructor(startedAt: Long, resetTimeoutNanos: Double) : this(
startedAt,
resetTimeoutNanos,
resetTimeoutNanos.nanoseconds,
CompletableDeferred()
)

Expand All @@ -470,20 +504,20 @@ private constructor(
* It is calculated as:
* `startedAt + resetTimeout`
*/
public val expiresAt: Long = startedAt + (resetTimeoutNanos.toLong() / 1_000_000)
public val expiresAt: Long = resetTimeout.plus(startedAt.milliseconds).toLong(DurationUnit.MILLISECONDS)

override fun equals(other: Any?): Boolean =
if (other is Open) this.startedAt == startedAt &&
this.resetTimeoutNanos == resetTimeoutNanos &&
this.resetTimeout == resetTimeout &&
this.expiresAt == expiresAt
else false

override fun toString(): String =
"CircuitBreaker.State.Open(startedAt=$startedAt, resetTimeoutNanos=$resetTimeoutNanos, expiresAt=$expiresAt)"
"CircuitBreaker.State.Open(startedAt=$startedAt, resetTimeoutNanos=$resetTimeout, expiresAt=$expiresAt)"

override fun hashCode(): Int {
var result = startedAt.hashCode()
result = 31 * result + resetTimeoutNanos.hashCode()
result = 31 * result + resetTimeout.hashCode()
result = 31 * result + expiresAt.hashCode()
return result
}
Expand All @@ -495,26 +529,47 @@ private constructor(
* - If the `test request` succeeds, then the [CircuitBreaker] is tripped back into [Closed], with the reset timeout, and the failures count also reset to their initial values.
* - If the `test request` fails, then the [CircuitBreaker] is tripped back into [Open], the [resetTimeout] is multiplied by the [exponentialBackoffFactor], up to the configured [maxResetTimeout].
*
* @param resetTimeoutNanos is the current `reset timeout` that the [CircuitBreaker] has to stay in [Open] state.
* @param resetTimeout is the current `reset timeout` that the [CircuitBreaker] has to stay in [Open] state.
* When the `reset timeout` lapsed, than the [CircuitBreaker] will allow a test request to go through in [HalfOpen].
* If the test request failed, the [CircuitBreaker] will go back into [Open] and it'll multiply the [resetTimeoutNanos] with the the exponential backoff factor.
* If the test request failed, the [CircuitBreaker] will go back into [Open] and it'll multiply the [resetTimeout] with the the exponential backoff factor.
*/
public class HalfOpen internal constructor(
public val resetTimeoutNanos: Double,
public val resetTimeout: Duration,
internal val awaitClose: CompletableDeferred<Unit>
) : State() {

@Deprecated(
"Prefer to use resetTimeout with kotlin.time.Duration",
ReplaceWith(
"resetTimeout.toDouble(DurationUnit.NANOSECONDS)",
"kotlin.time.DurationUnit"
)
)
public val resetTimeoutNanos: Double
get() = resetTimeout.toDouble(DurationUnit.NANOSECONDS)

public constructor(resetTimeout: Duration) : this(resetTimeout, CompletableDeferred())

@Deprecated(
"This constructor will be removed in Arrow 2.0",
level = DeprecationLevel.WARNING
)
internal constructor(
resetTimeoutNanos: Double,
awaitClose: CompletableDeferred<Unit>,
) : this(resetTimeoutNanos.nanoseconds, awaitClose)

public constructor(resetTimeoutNanos: Double) : this(resetTimeoutNanos, CompletableDeferred())
public constructor(resetTimeoutNanos: Double) : this(resetTimeoutNanos.nanoseconds, CompletableDeferred())

override fun hashCode(): Int =
resetTimeoutNanos.hashCode()
resetTimeout.hashCode()

override fun equals(other: Any?): Boolean =
if (other is HalfOpen) resetTimeoutNanos == other.resetTimeoutNanos
if (other is HalfOpen) resetTimeout == other.resetTimeout
else false

override fun toString(): String =
"HalfOpen(resetTimeoutNanos=$resetTimeoutNanos)"
"HalfOpen(resetTimeoutNanos=$resetTimeout)"
}
}

Expand All @@ -535,7 +590,7 @@ private constructor(
* the `resetTimeout` when in the `HalfOpen` state, in case
* the attempt to `Close` fails.
*
* @param maxResetTimeout is the maximum timeout the circuit breaker
* @param maxResetTimeoutNanos is the maximum timeout the circuit breaker
* is allowed to use when applying the `exponentialBackoffFactor`.
*
* @param onRejected is a callback for signaling rejected tasks, so
Expand All @@ -550,22 +605,37 @@ private constructor(
* @param onOpen is a callback for signaling transitions to [CircuitBreaker.State.Open].
*
*/
@Deprecated(
"Prefer the kotlin.time.Duration constructor instead",
ReplaceWith(
"of(maxFailures, resetTimeoutNanos.nanoseconds, exponentialBackoffFactor, maxResetTimeout, onRejected, onClosed, onHalfOpen, onOpen)",
"import kotlin.time.Duration.Companion.nanoseconds"
)
)
public suspend fun of(
maxFailures: Int,
resetTimeoutNanos: Double,
exponentialBackoffFactor: Double = 1.0,
maxResetTimeout: Double = Double.POSITIVE_INFINITY,
maxResetTimeoutNanos: Double = Double.POSITIVE_INFINITY,
onRejected: suspend () -> Unit = { },
onClosed: suspend () -> Unit = { },
onHalfOpen: suspend () -> Unit = { },
onOpen: suspend () -> Unit = { }
onOpen: suspend () -> Unit = { },
): CircuitBreaker =
CircuitBreaker(
state = AtomicRef(Closed(0)),
maxFailures = requireNotNull(maxFailures.takeIf { it >= 0 }) { "maxFailures expected to be higher than 0" },
resetTimeout = requireNotNull(resetTimeoutNanos.takeIf { it > 0 }) { "resetTimeoutNanos expected to be higher than 0" },
exponentialBackoffFactor = requireNotNull(exponentialBackoffFactor.takeIf { it > 0 }) { "exponentialBackoffFactor expected to be higher than 0" },
maxResetTimeout = requireNotNull(maxResetTimeout.takeIf { it > 0 }) { "maxResetTimeout expected to be higher than 0" },
maxFailures = maxFailures
.takeIf { it >= 0 }
.let { requireNotNull(it) { "maxFailures expected to be greater than or equal to 0, but was $maxFailures" } },
resetTimeoutNanos = resetTimeoutNanos
.takeIf { it > 0 }
.let { requireNotNull(it) { "resetTimeout expected to be greater than 0, but was $resetTimeoutNanos" } },
exponentialBackoffFactor = exponentialBackoffFactor
.takeIf { it > 0 }
.let { requireNotNull(it) { "exponentialBackoffFactor expected to be greater than 0, but was $exponentialBackoffFactor" } },
maxResetTimeoutNanos = maxResetTimeoutNanos
.takeIf { it > 0 }
.let { requireNotNull(it) { "maxResetTimeout expected to be greater than 0, but was $maxResetTimeoutNanos" } },
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
Expand Down Expand Up @@ -601,7 +671,6 @@ private constructor(
* @param onOpen is a callback for signaling transitions to [CircuitBreaker.State.Open].
*
*/
@ExperimentalTime
public suspend fun of(
maxFailures: Int,
resetTimeout: Duration,
Expand All @@ -610,17 +679,27 @@ private constructor(
onRejected: suspend () -> Unit = suspend { },
onClosed: suspend () -> Unit = suspend { },
onHalfOpen: suspend () -> Unit = suspend { },
onOpen: suspend () -> Unit = suspend { }
onOpen: suspend () -> Unit = suspend { },
): CircuitBreaker =
of(
maxFailures,
resetTimeout.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor,
maxResetTimeout.toDouble(DurationUnit.NANOSECONDS),
onRejected,
onClosed,
onHalfOpen,
onOpen
maxFailures = maxFailures
.takeIf { it >= 0 }
.let { requireNotNull(it) { "maxFailures expected to be greater than or equal to 0, but was $maxFailures" } },
resetTimeoutNanos = resetTimeout
.takeIf { it.isPositive() && it != Duration.ZERO }
.let { requireNotNull(it) { "resetTimeout expected to be greater than ${Duration.ZERO}, but was $resetTimeout" } }
.toDouble(DurationUnit.NANOSECONDS),
exponentialBackoffFactor = exponentialBackoffFactor
.takeIf { it > 0 }
.let { requireNotNull(it) { "exponentialBackoffFactor expected to be greater than 0, but was $exponentialBackoffFactor" } },
maxResetTimeoutNanos = maxResetTimeout
.takeIf { it.isPositive() && it != Duration.ZERO }
.let { requireNotNull(it) { "maxResetTimeout expected to be greater than ${Duration.ZERO}, but was $maxResetTimeout" } }
.toDouble(DurationUnit.NANOSECONDS),
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
onOpen = onOpen
)
}
}
Loading

0 comments on commit 0c2de60

Please sign in to comment.