Skip to content

Commit

Permalink
Add Saga type to Arrow Fx Resilience (#2902)
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev authored Feb 2, 2023
1 parent 118d5dc commit 33828a4
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public suspend inline fun <A> guaranteeCase(
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer(ExitCase.Cancelled(e)) }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t.nonFatalOrThrow())) }
runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t)) }
}
withContext(NonCancellable) { finalizer(ExitCase.Completed) }
return res
Expand Down
34 changes: 34 additions & 0 deletions arrow-libs/fx/arrow-fx-resilience/api/arrow-fx-resilience.api
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,40 @@ public final class arrow/fx/resilience/FlowKt {
public static final fun retry (Lkotlinx/coroutines/flow/Flow;Larrow/fx/resilience/Schedule;)Lkotlinx/coroutines/flow/Flow;
}

public final class arrow/fx/resilience/SagaActionStep {
public static final field INSTANCE Larrow/fx/resilience/SagaActionStep;
}

public final class arrow/fx/resilience/SagaBuilder : arrow/fx/resilience/SagaScope {
public fun <init> ()V
public fun <init> (Ljava/util/concurrent/atomic/AtomicReference;)V
public synthetic fun <init> (Ljava/util/concurrent/atomic/AtomicReference;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/fx/resilience/SagaDSLMarker : java/lang/annotation/Annotation {
}

public final class arrow/fx/resilience/SagaKt {
public static final fun saga (Lkotlin/jvm/functions/Function2;)Lkotlin/jvm/functions/Function2;
public static final fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Lkotlin/jvm/functions/Function2;
public static final fun transact (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class arrow/fx/resilience/SagaScope {
public abstract fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/resilience/SagaScope$DefaultImpls {
public static fun bind (Larrow/fx/resilience/SagaScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun invoke (Larrow/fx/resilience/SagaScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract class arrow/fx/resilience/Schedule {
public static final field Companion Larrow/fx/resilience/Schedule$Companion;
public final fun and (Larrow/fx/resilience/Schedule;)Larrow/fx/resilience/Schedule;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package arrow.fx.resilience

import arrow.core.continuations.AtomicRef
import arrow.core.continuations.updateAndGet
import arrow.core.nonFatalOrThrow
import kotlin.coroutines.cancellation.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext


/**
* The saga design pattern is a way to manage data consistency across microservices in distributed
* transaction scenarios. A [Saga] is useful when you need to manage data in a consistent manner
* across services in distributed transaction scenarios. Or when you need to compose multiple
* `actions` with a `compensation` that needs to run in a transaction like style.
*
* For example, let's say that we have the following domain types `Order`, `Payment`.
*
* ```kotlin
* data class Order(val id: UUID, val amount: Long)
* data class Payment(val id: UUID, val orderId: UUID)
* ```
*
* The creation of an `Order` can only remain when a payment has been made. In SQL, you might run
* this inside a transaction, which can automatically roll back the creation of the `Order` when the
* creation of the Payment fails.
*
* When you need to do this across distributed services, or a multiple atomic references, etc. You
* need to manually facilitate the rolling back of the performed actions, or compensating actions.
*
* The [Saga] type, and [saga] DSL remove all the boilerplate of manually having to facilitate this
* with a convenient suspending DSL.
*
* ```kotlin
* data class Order(val id: UUID, val amount: Long)
* suspend fun createOrder(): Order = Order(UUID.randomUUID(), 100L)
* suspend fun deleteOrder(order: Order): Unit = println("Deleting $order")
*
* data class Payment(val id: UUID, val orderId: UUID)
* suspend fun createPayment(order: Order): Payment = Payment(UUID.randomUUID(), order.id)
* suspend fun deletePayment(payment: Payment): Unit = println("Deleting $payment")
*
* suspend fun Payment.awaitSuccess(): Unit = throw RuntimeException("Payment Failed")
*
* suspend fun main() {
* saga {
* val order = saga({ createOrder() }) { deleteOrder(it) }
* val payment = saga { createPayment(order) }, ::deletePayment)
* payment.awaitSuccess()
* }.transact()
* }
* ```
*/
public typealias Saga<A> = suspend SagaScope.() -> A

/** DSL that enables the [Saga] pattern in a `suspend` DSL. */
@SagaDSLMarker
public interface SagaScope {

/**
* Run an [action] to produce a value of type [A] and _install_ a [compensation] to undo the
* action.
*/
@SagaDSLMarker
public suspend fun <A> saga(
action: suspend SagaActionStep.() -> A,
compensation: suspend (A) -> Unit,
): A

/** Executes a [Saga] and returns its value [A] */
public suspend fun <A> Saga<A>.bind(): A = invoke(this@SagaScope)

/** Invoke a [Saga] and returns its value [A] */
public suspend operator fun <A> Saga<A>.invoke(): A = invoke(this@SagaScope)
}

/**
* The Saga builder which exposes the [SagaScope.bind]. The `saga` builder uses the suspension
* system to run actions, and automatically register their compensating actions.
*
* When the resulting [Saga] fails it will run all the required compensating actions, also when the
* [Saga] gets cancelled it will respect its compensating actions before returning.
*
* By doing so we can guarantee that any transactional like operations made by the [Saga] will
* guarantee that it results in the correct state.
*/
public inline fun <A> saga(noinline block: suspend SagaScope.() -> A): Saga<A> = block

/** Create a lazy [Saga] that will only run when the [Saga] is invoked. */
public fun <A> saga(
action: suspend SagaActionStep.() -> A,
compensation: suspend (A) -> Unit
): Saga<A> = saga { saga(action, compensation) }

/**
* Transact runs the [Saga] turning it into a [suspend] effect that results in [A]. If the saga
* fails then all compensating actions are guaranteed to run. When a compensating action failed it
* will be ignored, and the other compensating actions will continue to be run.
*/
public suspend fun <A> Saga<A>.transact(): A {
val builder = SagaBuilder()
return guaranteeCase({ invoke(builder) }) { res ->
when (res) {
null -> builder.totalCompensation()
else -> Unit
}
}
}

/** DSL Marker for the SagaEffect DSL */
@DslMarker public annotation class SagaDSLMarker

/**
* Marker object to protect [SagaScope.saga] from calling [SagaScope.bind] in its `action` step.
*/
@SagaDSLMarker public object SagaActionStep

// Internal implementation of the `saga { }` builder.
@PublishedApi
internal class SagaBuilder(
private val stack: AtomicRef<List<suspend () -> Unit>> = AtomicRef(emptyList())
) : SagaScope {

@SagaDSLMarker
override suspend fun <A> saga(
action: suspend SagaActionStep.() -> A,
compensation: suspend (A) -> Unit
): A =
guaranteeCase({ action(SagaActionStep) }) { res ->
// This action failed, so we have no compensate to push on the stack
// the compensation stack will run in the `transact` stage, this is just the builder
when (res) {
null -> Unit
else -> stack.updateAndGet { listOf(suspend { compensation(res) }) + it }
}
}

@PublishedApi
internal suspend fun totalCompensation() {
stack
.get()
.fold<suspend () -> Unit, Throwable?>(null) { acc, finalizer ->
try {
finalizer()
acc
} catch (e: Throwable) {
e.nonFatalOrThrow()
acc?.apply { addSuppressed(e) } ?: e
}
}
?.let { throw it }
}
}

private suspend fun <A> guaranteeCase(
fa: suspend () -> A,
finalizer: suspend (value: A?) -> Unit
): A {
val res =
try {
fa()
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer(null) }
} catch (t: Throwable) {
runReleaseAndRethrow(t) { finalizer(null) }
}
withContext(NonCancellable) { finalizer(res) }
return res
}

private suspend fun runReleaseAndRethrow(original: Throwable, f: suspend () -> Unit): Nothing {
try {
withContext(NonCancellable) { f() }
} catch (e: Throwable) {
original.addSuppressed(e.nonFatalOrThrow())
}
throw original
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package arrow.fx.resilience

import arrow.fx.coroutines.parTraverse
import arrow.fx.coroutines.parZip
import io.kotest.assertions.fail
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.ints.shouldBeExactly
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.checkAll
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.Channel

@Suppress("unused")
class SagaSpec : StringSpec({
"Saga returns action result" {
checkAll(Arb.int()) { i ->
val saga = saga({ i }) { fail("Doesn't run") }
saga.transact() shouldBeExactly i
}
}

class SagaFailed : RuntimeException()

"Saga runs compensation if throw in builder & rethrows exception" {
checkAll(Arb.int()) { i ->
val compensation = CompletableDeferred<Int>()
val saga = saga {
saga({ i }) { compensation.complete(it) }
throw SagaFailed()
}
shouldThrow<SagaFailed> { saga.transact() }
compensation.await() shouldBeExactly i
}
}

"Saga runs compensation if throw in saga & rethrows exception" {
checkAll(Arb.int()) { i ->
val compensation = CompletableDeferred<Int>()
val saga = saga {
saga({ i }) { compensation.complete(it) }
saga({ throw SagaFailed() }) { fail("Doesn't run") }
}
shouldThrow<SagaFailed> { saga.transact() }
compensation.await() shouldBeExactly i
}
}

"Saga runs compensation in order & rethrows exception" {
checkAll(Arb.int(), Arb.int()) { a, b ->
val compensations = Channel<Int>(2)
val saga = saga {
saga({ a }) { compensations.send(it) }
saga({ b }) { compensations.send(it) }
saga({ throw SagaFailed() }) { fail("Doesn't run") }
}
shouldThrow<SagaFailed> { saga.transact() }
compensations.receive() shouldBeExactly b
compensations.receive() shouldBeExactly a
compensations.close()
}
}

"Sage composes compensation errors" {
checkAll(Arb.int()) { a ->
val compensationA = CompletableDeferred<Int>()
val original = SagaFailed()
val compensation = SagaFailed()
val saga = saga {
saga({ a }) { compensationA.complete(it) }
saga({}) { throw compensation }
saga({ throw original }) { fail("Doesn't run") }
}
val res = shouldThrow<SagaFailed> { saga.transact() }
res shouldBe original
res.suppressedExceptions[0] shouldBe compensation
compensationA.await() shouldBeExactly a
}
}

"Sage composes compensation errors when thrown in block" {
checkAll(Arb.int()) { a ->
val compensationA = CompletableDeferred<Int>()
val original = SagaFailed()
val compensation = SagaFailed()
val saga = saga {
saga({ a }) { compensationA.complete(it) }
saga({}) { throw compensation }
throw original
}
val res = shouldThrow<SagaFailed> { saga.transact() }
res shouldBe original
res.suppressedExceptions[0] shouldBe compensation
compensationA.await() shouldBeExactly a
}
}

"Saga can traverse" {
checkAll(Arb.list(Arb.int())) { iis ->
saga { iis.map { saga({ it }) { fail("Doesn't run") } } }.transact() shouldBe iis
}
}

"Saga can parTraverse" {
checkAll(Arb.list(Arb.int())) { iis ->
saga { iis.parTraverse { saga({ it }) { fail("Doesn't run") } } }.transact() shouldBe iis
}
}

"parZip runs left compensation" {
checkAll(Arb.int()) { a ->
val compensationA = CompletableDeferred<Int>()
val latch = CompletableDeferred<Unit>()
val saga = saga {
parZip(
{
saga({
latch.complete(Unit)
a
}) { compensationA.complete(it) }
},
{
saga({
latch.await()
throw SagaFailed()
}) { fail("Doesn't run") }
}
) { _, _ -> }
}
shouldThrow<SagaFailed> { saga.transact() }
compensationA.await() shouldBeExactly a
}
}

"parZip runs right compensation" {
checkAll(Arb.int()) { a ->
val compensationB = CompletableDeferred<Int>()
val latch = CompletableDeferred<Unit>()
val saga = saga {
parZip(
{
saga({
latch.await()
throw SagaFailed()
}) { fail("Doesn't run") }
},
{
saga({
latch.complete(Unit)
a
}) { compensationB.complete(it) }
}
) { _, _ -> }
}
shouldThrow<SagaFailed> { saga.transact() }
compensationB.await() shouldBeExactly a
}
}
})

0 comments on commit 33828a4

Please sign in to comment.