diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt index 952de1c6241..6ae5b5940f2 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/flow.kt @@ -20,10 +20,15 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.retryWhen import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.flow.zip import kotlin.jvm.JvmMultifileClass import kotlin.jvm.JvmName +import kotlin.time.Duration +import kotlin.time.ExperimentalTime +import kotlinx.coroutines.flow.map /** * Retries collection of the given flow when an exception occurs in the upstream flow based on a decision by the [schedule]. @@ -159,3 +164,93 @@ public inline fun Flow.parMapUnordered( emit(transform(o)) }.flowOn(ctx) }.flattenMerge(concurrency) + +/** Repeats the Flow forever */ +public fun Flow.repeat(): Flow = + flow { + while (true) { + collect { + emit(it) + } + } + } + +/** + * Flow that emits [A] every [period] while taking into account how much time it takes downstream to consume the emission. + * If downstream takes longer to process than [period] than it immediately emits another [A]. + * + * Use `onEach { delay(timeMillis) }` for an alternative that sleeps [period] between every element. + * This is different in that the time between every element is equal to the specified period, + * regardless of how much time it takes to process that tick downstream. + * + * i.e, for a period of 1 second and a delay(100), the timestamps of the emission would be 1s, 2s, 3s, ... when using [fixedRate]. + * Whereas with `onEach { delay(timeMillis) }` it would run at timestamps 1s, 2.1s, 3.2s, ... + * + * @param period period between [Unit] emits of the resulting [Flow]. + */ +@ExperimentalTime +public fun Flow.metered(period: Duration): Flow = + fixedRate(period).zip(this) { _, a -> a } + +public fun Flow.metered(period: Long): Flow = + fixedRate(period).zip(this) { _, a -> a } + +@ExperimentalTime +public fun fixedRate( + period: Duration, + dampen: Boolean = true, + timeStampInMillis: () -> Long = { timeInMillis() } +): Flow = + fixedRate(period.inWholeMilliseconds, dampen, timeStampInMillis) + +/** + * Flow that emits [Unit] every [period] while taking into account how much time it takes downstream to consume the emission. + * If downstream takes longer to process than [period] than it immediately emits another [Unit], + * if you set [dampen] to false it will send `n = downstreamTime / period` [Unit] elements immediately. + * + * Use `onEach { delay(timeMillis) }` for an alternative that sleeps [period] between every element. + * This is different in that the time between every element is equal to the specified period, + * regardless of how much time it takes to process that tick downstream. + * + * i.e, for a period of 1 second and a delay(100), the timestamps of the emission would be 1s, 2s, 3s, ... when using [fixedRate]. + * Whereas with `onEach { delay(timeMillis) }` it would run at timestamps 1s, 2.1s, 3.2s, ... + * + * @param period period between [Unit] emits of the resulting [Flow]. + * @param dampen if you set [dampen] to false it will send `n` times [period] time it took downstream to process the emission. + * @param timeStampInMillis allows for supplying a different timestamp function, useful to override with `runBlockingTest` + */ +public fun fixedRate( + period: Long, + dampen: Boolean = true, + timeStampInMillis: () -> Long = { timeInMillis() } +): Flow = + if (period == 0L) flowOf(Unit).repeat() + else flow { + var lastAwakeAt = timeStampInMillis() + + while (true) { + val now = timeStampInMillis() + val next = lastAwakeAt + period + + if (next > now) { + delay(next - now) + emit(Unit) + lastAwakeAt = next + } else { + val ticks: Long = (now - lastAwakeAt - 1) / period + when { + ticks < 0L -> Unit + ticks == 0L || dampen -> emit(Unit) + else -> repeat(ticks.toInt()) { emit(Unit) } + } + lastAwakeAt += (period * ticks) + } + } + } + +public inline fun Flow.mapIndexed(crossinline f: suspend (Int, A) -> B): Flow { + var index = 0 + return map { value -> + f(index++, value) + } +} diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt index 1af72548ee5..937f84a6166 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/FlowJvmTest.kt @@ -6,13 +6,21 @@ import io.kotest.matchers.longs.shouldBeLessThan import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int -import kotlinx.coroutines.flow.collect +import io.kotest.property.arbitrary.map +import io.kotest.property.arbitrary.positiveInts +import kotlin.time.Duration import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.toSet import kotlinx.coroutines.test.runBlockingTest import kotlin.time.ExperimentalTime import kotlin.time.milliseconds +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.withTimeoutOrNull @ExperimentalTime class FlowJvmTest : ArrowFxSpec(spec = { @@ -59,4 +67,91 @@ class FlowJvmTest : ArrowFxSpec(spec = { } } } + + "fixedDelay" { + runBlockingTest { + checkAll(Arb.positiveInts().map(Int::toLong), Arb.int(1..100)) { waitPeriod, n -> + val emissionDuration = waitPeriod / 10L + var state: Long? = null + + val rate = flow { emit(delay(Duration.milliseconds(waitPeriod))) }.repeat() + .map { + val now = state ?: currentTime + val nextNow = currentTime + val lapsed = nextNow - now + state = nextNow + delay(emissionDuration) + lapsed + } + .take(n) + .toList() + + rate.first() shouldBe 0 // First element is immediately + rate.drop(1).forEach { act -> + act shouldBe (waitPeriod + emissionDuration) // Remaining elements all take delay + emission duration + } + } + } + } + + "fixedRate" { + runBlockingTest { + checkAll(Arb.positiveInts().map(Int::toLong), Arb.int(1..100)) { waitPeriod, n -> + val emissionDuration = waitPeriod / 10 + var state: Long? = null + + val rate = fixedRate(Duration.milliseconds(waitPeriod)) { currentTime } + .map { + val now = state ?: currentTime + val nextNow = currentTime + val lapsed = nextNow - now + state = nextNow + delay(emissionDuration) + lapsed + } + .take(n) + .toList() + + rate.first() shouldBe 0 // First element is immediately + rate.drop(1).forEach { act -> + // Remaining elements all take total of waitPeriod, emissionDuration is correctly taken into account. + act shouldBe waitPeriod + } + } + } + } + + "fixedRate(dampen = true)" { + val waitPeriod = 1000L + val n = 3 + val timeout = (n + 1) * waitPeriod + 500 + val buffer = mutableListOf() + + withTimeoutOrNull(timeout) { + fixedRate(Duration.milliseconds(waitPeriod), true) { timeInMillis() } + .mapIndexed { index, _ -> + if (index == 0) delay(waitPeriod * n) else Unit + } + .collect(buffer::add) + } + + buffer.size shouldBe 2 + } + + "fixedRate(dampen = false)" { + val waitPeriod = 1000L + val n = 3 + val timeout = (n + 1) * waitPeriod + 500 + val buffer = mutableListOf() + + withTimeoutOrNull(timeout) { + fixedRate(Duration.milliseconds(waitPeriod), false) { timeInMillis() } + .mapIndexed { index, _ -> + if (index == 0) delay(waitPeriod * n) else Unit + } + .collect(buffer::add) + } + + buffer.size shouldBe n + 1 + } })