-
Notifications
You must be signed in to change notification settings - Fork 451
/
Copy pathBracket.kt
256 lines (241 loc) · 8.98 KB
/
Bracket.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package arrow.fx.coroutines
import arrow.core.nonFatalOrThrow
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
public sealed class ExitCase {
public object Completed : ExitCase() {
override fun toString(): String =
"ExitCase.Completed"
}
public data class Cancelled(val exception: CancellationException) : ExitCase()
public data class Failure(val failure: Throwable) : ExitCase()
public companion object {
public fun ExitCase(error: Throwable): ExitCase =
if (error is CancellationException) Cancelled(error) else Failure(error)
}
}
/**
* Registers an [onCancel] handler after [fa].
* [onCancel] is guaranteed to be called in case of cancellation, otherwise it's ignored.
*
* This function is useful for wiring cancellation tokens between fibers, building inter-op with other effect systems or testing.
*
* @param fa program that you want to register handler on
* @param onCancel handler to run when [fa] gets cancelled.
* @see guarantee for registering a handler that is guaranteed to always run.
* @see guaranteeCase for registering a handler that executes for any [ExitCase].
*/
public suspend inline fun <A> onCancel(
fa: () -> A,
crossinline onCancel: suspend () -> Unit
): A = guaranteeCase(fa) { case ->
when (case) {
is ExitCase.Cancelled -> onCancel.invoke()
else -> Unit
}
}
/**
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation.
*
* As best practice, it's not a good idea to release resources via [guarantee].
* since [guarantee] doesn't properly model acquiring, using and releasing resources.
* It only models scheduling of a finalizer after a given suspending program,
* so you should prefer [Resource] or [bracket] which captures acquiring,
* using and releasing into 3 separate steps to ensure resource safety.
*
* @param fa program that you want to register handler on
* @param finalizer handler to run after [fa].
* @see guaranteeCase for registering a handler that tracks the [ExitCase] of [fa].
*/
public suspend inline fun <A> guarantee(
fa: () -> A,
crossinline finalizer: suspend () -> Unit
): A {
val res = try {
fa.invoke()
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer() }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer() }
}
withContext(NonCancellable) { finalizer() }
return res
}
/**
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation, allowing
* for differentiating between exit conditions with the [ExitCase] argument of the finalizer.
*
* As best practice, it's not a good idea to release resources via [guaranteeCase].
* since [guaranteeCase] doesn't properly model acquiring, using and releasing resources.
* It only models scheduling of a finalizer after a given suspending program,
* so you should prefer [Resource] or [bracketCase] which captures acquiring,
* using and releasing into 3 separate steps to ensure resource safety.
*
* @param fa program that you want to register handler on
* @param finalizer handler to run after [fa].
* @see guarantee for registering a handler that ignores the [ExitCase] of [fa].
*/
public suspend inline fun <A> guaranteeCase(
fa: () -> A,
crossinline finalizer: suspend (ExitCase) -> Unit
): A {
val res = try {
fa()
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer(ExitCase.Cancelled(e)) }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t)) }
}
withContext(NonCancellable) { finalizer(ExitCase.Completed) }
return res
}
/**
* Describes a task with safe resource acquisition and release in the face of errors and interruption.
* It would be the equivalent of an async capable `try/catch/finally` statements in mainstream imperative languages for resource
* acquisition and release.
*
* @param acquire is the action to acquire the resource.
*
* @param use is the action to consume the resource and produce a result.
* Once the resulting suspend program terminates, either successfully, error or disposed,
* the [release] function will run to clean up the resources.
*
* @param release is the action that's supposed to release the allocated resource after `use` is done, irregardless
* of its exit condition.
*
* ```kotlin
* import arrow.fx.coroutines.*
*
* class File(val url: String) {
* fun open(): File = this
* fun close(): Unit {}
* override fun toString(): String = "This file contains some interesting content from $url!"
* }
*
* suspend fun openFile(uri: String): File = File(uri).open()
* suspend fun closeFile(file: File): Unit = file.close()
* suspend fun fileToString(file: File): String = file.toString()
*
* suspend fun main(): Unit {
* //sampleStart
* val res = bracket(
* acquire = { openFile("data.json") },
* use = { file -> fileToString(file) },
* release = { file: File -> closeFile(file) }
* )
* //sampleEnd
* println(res)
* }
* ```
* <!--- KNIT example-bracket-01.kt -->
*/
public suspend inline fun <A, B> bracket(
crossinline acquire: suspend () -> A,
use: (A) -> B,
crossinline release: suspend (A) -> Unit
): B {
val acquired = withContext(NonCancellable) {
acquire()
}
val res = try {
use(acquired)
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { release(acquired) }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired) }
}
withContext(NonCancellable) { release(acquired) }
return res
}
/**
* A way to safely acquire a resource and release in the face of errors and cancellation.
* It uses [ExitCase] to distinguish between different exit cases when releasing the acquired resource.
*
* [bracketCase] exists out of three stages:
* 1. acquisition
* 2. consumption
* 3. releasing
*
* 1. Resource acquisition is **NON CANCELLABLE**.
* If resource acquisition fails, meaning no resource was actually successfully acquired then we short-circuit the effect.
* As the resource was not acquired, it is not possible to [use] or [release] it.
* If it is successful we pass the result to stage 2 [use].
*
* 2. Resource consumption is like any other `suspend` effect. The key difference here is that it's wired in such a way that
* [release] **will always** be called either on [ExitCase.Cancelled], [ExitCase.Failure] or [ExitCase.Completed].
* If it failed, then the resulting [suspend] from [bracketCase] will be the error; otherwise the result of [use] will be returned.
*
* 3. Resource releasing is **NON CANCELLABLE**, otherwise it could result in leaks.
* In the case it throws an exception, the resulting [suspend] will be either such error, or a composed error if one occurred in the [use] stage.
*
* @param acquire is the action to acquire the resource.
*
* @param use is the action to consume the resource and produce a result.
* Once the resulting suspend program terminates, either successfully, error or disposed,
* the [release] function will run to clean up the resources.
*
* @param release is the action to release the allocated resource after [use] terminates.
*
* ```kotlin
* import arrow.fx.coroutines.*
*
* class File(val url: String) {
* fun open(): File = this
* fun close(): Unit {}
* }
*
* suspend fun File.content(): String =
* "This file contains some interesting content from $url!"
* suspend fun openFile(uri: String): File = File(uri).open()
* suspend fun closeFile(file: File): Unit = file.close()
*
* suspend fun main(): Unit {
* //sampleStart
* val res = bracketCase(
* acquire = { openFile("data.json") },
* use = { file -> file.content() },
* release = { file, exitCase ->
* when (exitCase) {
* is ExitCase.Completed -> println("File closed with $exitCase")
* is ExitCase.Cancelled -> println("Program cancelled with $exitCase")
* is ExitCase.Failure -> println("Program failed with $exitCase")
* }
* closeFile(file)
* }
* )
* //sampleEnd
* println(res)
* }
* ```
* <!--- KNIT example-bracket-02.kt -->
*/
public suspend inline fun <A, B> bracketCase(
crossinline acquire: suspend () -> A,
use: (A) -> B,
crossinline release: suspend (A, ExitCase) -> Unit
): B {
val acquired = withContext(NonCancellable) {
acquire()
}
val res = try {
use(acquired)
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) }
}
withContext(NonCancellable) { release(acquired, ExitCase.Completed) }
return res
}
@PublishedApi
internal suspend inline fun runReleaseAndRethrow(original: Throwable, crossinline f: suspend () -> Unit): Nothing {
try {
withContext(NonCancellable) {
f()
}
} catch (e: Throwable) {
original.addSuppressed(e.nonFatalOrThrow())
}
throw original
}