-
Notifications
You must be signed in to change notification settings - Fork 451
/
Copy pathflow.kt
49 lines (46 loc) · 1.34 KB
/
flow.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package arrow.resilience
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.retryWhen
import kotlin.time.Duration.Companion.ZERO
/**
* Retries collection of the given flow when an exception occurs in the upstream flow based on a decision by the [schedule].
* This operator is *transparent* to exceptions that occur in downstream flow and does not retry on exceptions that are thrown
* to cancel the flow.
*
* @see [Schedule] for how to build a schedule.
*
* ```kotlin
* import arrow.resilience.*
* import kotlinx.coroutines.flow.*
*
* suspend fun main(): Unit {
* var counter = 0
* val flow = flow {
* emit(counter)
* if (++counter <= 5) throw RuntimeException("Bang!")
* }
* //sampleStart
* val sum = flow.retry(Schedule.recurs(5))
* .reduce(Int::plus)
* //sampleEnd
* println(sum)
* }
* ```
* <!--- KNIT example-flow-01.kt -->
*
* @param schedule - the [Schedule] used for retrying the collection of the flow
*/
public fun <A, B> Flow<A>.retry(schedule: Schedule<Throwable, B>): Flow<A> {
var step = schedule.step
return retryWhen { cause, _ ->
when (val dec = step(cause)) {
is Schedule.Decision.Continue -> {
if (dec.delay != ZERO) delay(dec.delay)
step = dec.step
true
}
is Schedule.Decision.Done -> false
}
}
}