From 33828a48eaac1f1bd6e07ef04f58d027ee986a0c Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 2 Feb 2023 13:54:16 +0100 Subject: [PATCH] Add Saga type to Arrow Fx Resilience (#2902) --- .../kotlin/arrow/fx/coroutines/Bracket.kt | 2 +- .../api/arrow-fx-resilience.api | 34 ++++ .../kotlin/arrow/fx/resilience/Saga.kt | 178 ++++++++++++++++++ .../kotlin/arrow/fx/resilience/SagaSpec.kt | 162 ++++++++++++++++ 4 files changed, 375 insertions(+), 1 deletion(-) create mode 100644 arrow-libs/fx/arrow-fx-resilience/src/commonMain/kotlin/arrow/fx/resilience/Saga.kt create mode 100644 arrow-libs/fx/arrow-fx-resilience/src/commonTest/kotlin/arrow/fx/resilience/SagaSpec.kt diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt index d9273f3ef95..3ad336a3b8a 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt @@ -92,7 +92,7 @@ public suspend inline fun guaranteeCase( } catch (e: CancellationException) { runReleaseAndRethrow(e) { finalizer(ExitCase.Cancelled(e)) } } catch (t: Throwable) { - runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t.nonFatalOrThrow())) } + runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t)) } } withContext(NonCancellable) { finalizer(ExitCase.Completed) } return res diff --git a/arrow-libs/fx/arrow-fx-resilience/api/arrow-fx-resilience.api b/arrow-libs/fx/arrow-fx-resilience/api/arrow-fx-resilience.api index 7ba4bc76a5d..1ae7b09d9c2 100644 --- a/arrow-libs/fx/arrow-fx-resilience/api/arrow-fx-resilience.api +++ b/arrow-libs/fx/arrow-fx-resilience/api/arrow-fx-resilience.api @@ -61,6 +61,40 @@ public final class arrow/fx/resilience/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;Larrow/fx/resilience/Schedule;)Lkotlinx/coroutines/flow/Flow; } +public final class arrow/fx/resilience/SagaActionStep { + public static final field INSTANCE Larrow/fx/resilience/SagaActionStep; +} + +public final class arrow/fx/resilience/SagaBuilder : arrow/fx/resilience/SagaScope { + public fun ()V + public fun (Ljava/util/concurrent/atomic/AtomicReference;)V + public synthetic fun (Ljava/util/concurrent/atomic/AtomicReference;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface annotation class arrow/fx/resilience/SagaDSLMarker : java/lang/annotation/Annotation { +} + +public final class arrow/fx/resilience/SagaKt { + public static final fun saga (Lkotlin/jvm/functions/Function2;)Lkotlin/jvm/functions/Function2; + public static final fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Lkotlin/jvm/functions/Function2; + public static final fun transact (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class arrow/fx/resilience/SagaScope { + public abstract fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class arrow/fx/resilience/SagaScope$DefaultImpls { + public static fun bind (Larrow/fx/resilience/SagaScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun invoke (Larrow/fx/resilience/SagaScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract class arrow/fx/resilience/Schedule { public static final field Companion Larrow/fx/resilience/Schedule$Companion; public final fun and (Larrow/fx/resilience/Schedule;)Larrow/fx/resilience/Schedule; diff --git a/arrow-libs/fx/arrow-fx-resilience/src/commonMain/kotlin/arrow/fx/resilience/Saga.kt b/arrow-libs/fx/arrow-fx-resilience/src/commonMain/kotlin/arrow/fx/resilience/Saga.kt new file mode 100644 index 00000000000..ef46b396562 --- /dev/null +++ b/arrow-libs/fx/arrow-fx-resilience/src/commonMain/kotlin/arrow/fx/resilience/Saga.kt @@ -0,0 +1,178 @@ +package arrow.fx.resilience + +import arrow.core.continuations.AtomicRef +import arrow.core.continuations.updateAndGet +import arrow.core.nonFatalOrThrow +import kotlin.coroutines.cancellation.CancellationException +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.withContext + + +/** + * The saga design pattern is a way to manage data consistency across microservices in distributed + * transaction scenarios. A [Saga] is useful when you need to manage data in a consistent manner + * across services in distributed transaction scenarios. Or when you need to compose multiple + * `actions` with a `compensation` that needs to run in a transaction like style. + * + * For example, let's say that we have the following domain types `Order`, `Payment`. + * + * ```kotlin + * data class Order(val id: UUID, val amount: Long) + * data class Payment(val id: UUID, val orderId: UUID) + * ``` + * + * The creation of an `Order` can only remain when a payment has been made. In SQL, you might run + * this inside a transaction, which can automatically roll back the creation of the `Order` when the + * creation of the Payment fails. + * + * When you need to do this across distributed services, or a multiple atomic references, etc. You + * need to manually facilitate the rolling back of the performed actions, or compensating actions. + * + * The [Saga] type, and [saga] DSL remove all the boilerplate of manually having to facilitate this + * with a convenient suspending DSL. + * + * ```kotlin + * data class Order(val id: UUID, val amount: Long) + * suspend fun createOrder(): Order = Order(UUID.randomUUID(), 100L) + * suspend fun deleteOrder(order: Order): Unit = println("Deleting $order") + * + * data class Payment(val id: UUID, val orderId: UUID) + * suspend fun createPayment(order: Order): Payment = Payment(UUID.randomUUID(), order.id) + * suspend fun deletePayment(payment: Payment): Unit = println("Deleting $payment") + * + * suspend fun Payment.awaitSuccess(): Unit = throw RuntimeException("Payment Failed") + * + * suspend fun main() { + * saga { + * val order = saga({ createOrder() }) { deleteOrder(it) } + * val payment = saga { createPayment(order) }, ::deletePayment) + * payment.awaitSuccess() + * }.transact() + * } + * ``` + */ +public typealias Saga = suspend SagaScope.() -> A + +/** DSL that enables the [Saga] pattern in a `suspend` DSL. */ +@SagaDSLMarker +public interface SagaScope { + + /** + * Run an [action] to produce a value of type [A] and _install_ a [compensation] to undo the + * action. + */ + @SagaDSLMarker + public suspend fun saga( + action: suspend SagaActionStep.() -> A, + compensation: suspend (A) -> Unit, + ): A + + /** Executes a [Saga] and returns its value [A] */ + public suspend fun Saga.bind(): A = invoke(this@SagaScope) + + /** Invoke a [Saga] and returns its value [A] */ + public suspend operator fun Saga.invoke(): A = invoke(this@SagaScope) +} + +/** + * The Saga builder which exposes the [SagaScope.bind]. The `saga` builder uses the suspension + * system to run actions, and automatically register their compensating actions. + * + * When the resulting [Saga] fails it will run all the required compensating actions, also when the + * [Saga] gets cancelled it will respect its compensating actions before returning. + * + * By doing so we can guarantee that any transactional like operations made by the [Saga] will + * guarantee that it results in the correct state. + */ +public inline fun saga(noinline block: suspend SagaScope.() -> A): Saga = block + +/** Create a lazy [Saga] that will only run when the [Saga] is invoked. */ +public fun saga( + action: suspend SagaActionStep.() -> A, + compensation: suspend (A) -> Unit +): Saga = saga { saga(action, compensation) } + +/** + * Transact runs the [Saga] turning it into a [suspend] effect that results in [A]. If the saga + * fails then all compensating actions are guaranteed to run. When a compensating action failed it + * will be ignored, and the other compensating actions will continue to be run. + */ +public suspend fun Saga.transact(): A { + val builder = SagaBuilder() + return guaranteeCase({ invoke(builder) }) { res -> + when (res) { + null -> builder.totalCompensation() + else -> Unit + } + } +} + +/** DSL Marker for the SagaEffect DSL */ +@DslMarker public annotation class SagaDSLMarker + +/** + * Marker object to protect [SagaScope.saga] from calling [SagaScope.bind] in its `action` step. + */ +@SagaDSLMarker public object SagaActionStep + +// Internal implementation of the `saga { }` builder. +@PublishedApi +internal class SagaBuilder( + private val stack: AtomicRef Unit>> = AtomicRef(emptyList()) +) : SagaScope { + + @SagaDSLMarker + override suspend fun saga( + action: suspend SagaActionStep.() -> A, + compensation: suspend (A) -> Unit + ): A = + guaranteeCase({ action(SagaActionStep) }) { res -> + // This action failed, so we have no compensate to push on the stack + // the compensation stack will run in the `transact` stage, this is just the builder + when (res) { + null -> Unit + else -> stack.updateAndGet { listOf(suspend { compensation(res) }) + it } + } + } + + @PublishedApi + internal suspend fun totalCompensation() { + stack + .get() + .fold Unit, Throwable?>(null) { acc, finalizer -> + try { + finalizer() + acc + } catch (e: Throwable) { + e.nonFatalOrThrow() + acc?.apply { addSuppressed(e) } ?: e + } + } + ?.let { throw it } + } +} + +private suspend fun guaranteeCase( + fa: suspend () -> A, + finalizer: suspend (value: A?) -> Unit +): A { + val res = + try { + fa() + } catch (e: CancellationException) { + runReleaseAndRethrow(e) { finalizer(null) } + } catch (t: Throwable) { + runReleaseAndRethrow(t) { finalizer(null) } + } + withContext(NonCancellable) { finalizer(res) } + return res +} + +private suspend fun runReleaseAndRethrow(original: Throwable, f: suspend () -> Unit): Nothing { + try { + withContext(NonCancellable) { f() } + } catch (e: Throwable) { + original.addSuppressed(e.nonFatalOrThrow()) + } + throw original +} diff --git a/arrow-libs/fx/arrow-fx-resilience/src/commonTest/kotlin/arrow/fx/resilience/SagaSpec.kt b/arrow-libs/fx/arrow-fx-resilience/src/commonTest/kotlin/arrow/fx/resilience/SagaSpec.kt new file mode 100644 index 00000000000..c389b2f3ca6 --- /dev/null +++ b/arrow-libs/fx/arrow-fx-resilience/src/commonTest/kotlin/arrow/fx/resilience/SagaSpec.kt @@ -0,0 +1,162 @@ +package arrow.fx.resilience + +import arrow.fx.coroutines.parTraverse +import arrow.fx.coroutines.parZip +import io.kotest.assertions.fail +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.ints.shouldBeExactly +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list +import io.kotest.property.checkAll +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.channels.Channel + +@Suppress("unused") +class SagaSpec : StringSpec({ + "Saga returns action result" { + checkAll(Arb.int()) { i -> + val saga = saga({ i }) { fail("Doesn't run") } + saga.transact() shouldBeExactly i + } + } + + class SagaFailed : RuntimeException() + + "Saga runs compensation if throw in builder & rethrows exception" { + checkAll(Arb.int()) { i -> + val compensation = CompletableDeferred() + val saga = saga { + saga({ i }) { compensation.complete(it) } + throw SagaFailed() + } + shouldThrow { saga.transact() } + compensation.await() shouldBeExactly i + } + } + + "Saga runs compensation if throw in saga & rethrows exception" { + checkAll(Arb.int()) { i -> + val compensation = CompletableDeferred() + val saga = saga { + saga({ i }) { compensation.complete(it) } + saga({ throw SagaFailed() }) { fail("Doesn't run") } + } + shouldThrow { saga.transact() } + compensation.await() shouldBeExactly i + } + } + + "Saga runs compensation in order & rethrows exception" { + checkAll(Arb.int(), Arb.int()) { a, b -> + val compensations = Channel(2) + val saga = saga { + saga({ a }) { compensations.send(it) } + saga({ b }) { compensations.send(it) } + saga({ throw SagaFailed() }) { fail("Doesn't run") } + } + shouldThrow { saga.transact() } + compensations.receive() shouldBeExactly b + compensations.receive() shouldBeExactly a + compensations.close() + } + } + + "Sage composes compensation errors" { + checkAll(Arb.int()) { a -> + val compensationA = CompletableDeferred() + val original = SagaFailed() + val compensation = SagaFailed() + val saga = saga { + saga({ a }) { compensationA.complete(it) } + saga({}) { throw compensation } + saga({ throw original }) { fail("Doesn't run") } + } + val res = shouldThrow { saga.transact() } + res shouldBe original + res.suppressedExceptions[0] shouldBe compensation + compensationA.await() shouldBeExactly a + } + } + + "Sage composes compensation errors when thrown in block" { + checkAll(Arb.int()) { a -> + val compensationA = CompletableDeferred() + val original = SagaFailed() + val compensation = SagaFailed() + val saga = saga { + saga({ a }) { compensationA.complete(it) } + saga({}) { throw compensation } + throw original + } + val res = shouldThrow { saga.transact() } + res shouldBe original + res.suppressedExceptions[0] shouldBe compensation + compensationA.await() shouldBeExactly a + } + } + + "Saga can traverse" { + checkAll(Arb.list(Arb.int())) { iis -> + saga { iis.map { saga({ it }) { fail("Doesn't run") } } }.transact() shouldBe iis + } + } + + "Saga can parTraverse" { + checkAll(Arb.list(Arb.int())) { iis -> + saga { iis.parTraverse { saga({ it }) { fail("Doesn't run") } } }.transact() shouldBe iis + } + } + + "parZip runs left compensation" { + checkAll(Arb.int()) { a -> + val compensationA = CompletableDeferred() + val latch = CompletableDeferred() + val saga = saga { + parZip( + { + saga({ + latch.complete(Unit) + a + }) { compensationA.complete(it) } + }, + { + saga({ + latch.await() + throw SagaFailed() + }) { fail("Doesn't run") } + } + ) { _, _ -> } + } + shouldThrow { saga.transact() } + compensationA.await() shouldBeExactly a + } + } + + "parZip runs right compensation" { + checkAll(Arb.int()) { a -> + val compensationB = CompletableDeferred() + val latch = CompletableDeferred() + val saga = saga { + parZip( + { + saga({ + latch.await() + throw SagaFailed() + }) { fail("Doesn't run") } + }, + { + saga({ + latch.complete(Unit) + a + }) { compensationB.complete(it) } + } + ) { _, _ -> } + } + shouldThrow { saga.transact() } + compensationB.await() shouldBeExactly a + } + } +})