Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow operators fixedRate/metered/mapIndexed/repeat #2457

Merged
merged 7 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.retryWhen
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.flow.zip
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.flow.map

/**
* Retries collection of the given flow when an exception occurs in the upstream flow based on a decision by the [schedule].
Expand Down Expand Up @@ -159,3 +164,93 @@ public inline fun <A, B> Flow<A>.parMapUnordered(
emit(transform(o))
}.flowOn(ctx)
}.flattenMerge(concurrency)

/** Repeats the Flow forever */
public fun <A> Flow<A>.repeat(): Flow<A> =
flow {
while (true) {
collect {
emit(it)
}
}
}

/**
* Flow that emits [A] every [period] while taking into account how much time it takes downstream to consume the emission.
* If downstream takes longer to process than [period] than it immediately emits another [A].
*
* Use `onEach { delay(timeMillis) }` for an alternative that sleeps [period] between every element.
* This is different in that the time between every element is equal to the specified period,
* regardless of how much time it takes to process that tick downstream.
*
* i.e, for a period of 1 second and a delay(100), the timestamps of the emission would be 1s, 2s, 3s, ... when using [fixedRate].
* Whereas with `onEach { delay(timeMillis) }` it would run at timestamps 1s, 2.1s, 3.2s, ...
*
* @param period period between [Unit] emits of the resulting [Flow].
*/
@ExperimentalTime
public fun <A> Flow<A>.metered(period: Duration): Flow<A> =
fixedRate(period).zip(this) { _, a -> a }

public fun <A> Flow<A>.metered(period: Long): Flow<A> =
fixedRate(period).zip(this) { _, a -> a }

@ExperimentalTime
public fun fixedRate(
period: Duration,
dampen: Boolean = true,
timeStampInMillis: () -> Long = { timeInMillis() }
): Flow<Unit> =
fixedRate(period.inWholeMilliseconds, dampen, timeStampInMillis)

/**
* Flow that emits [Unit] every [period] while taking into account how much time it takes downstream to consume the emission.
* If downstream takes longer to process than [period] than it immediately emits another [Unit],
* if you set [dampen] to false it will send `n = downstreamTime / period` [Unit] elements immediately.
*
* Use `onEach { delay(timeMillis) }` for an alternative that sleeps [period] between every element.
* This is different in that the time between every element is equal to the specified period,
* regardless of how much time it takes to process that tick downstream.
*
* i.e, for a period of 1 second and a delay(100), the timestamps of the emission would be 1s, 2s, 3s, ... when using [fixedRate].
* Whereas with `onEach { delay(timeMillis) }` it would run at timestamps 1s, 2.1s, 3.2s, ...
*
* @param period period between [Unit] emits of the resulting [Flow].
* @param dampen if you set [dampen] to false it will send `n` times [period] time it took downstream to process the emission.
* @param timeStampInMillis allows for supplying a different timestamp function, useful to override with `runBlockingTest`
*/
public fun fixedRate(
period: Long,
dampen: Boolean = true,
timeStampInMillis: () -> Long = { timeInMillis() }
): Flow<Unit> =
if (period == 0L) flowOf(Unit).repeat()
else flow {
var lastAwakeAt = timeStampInMillis()

while (true) {
val now = timeStampInMillis()
val next = lastAwakeAt + period

if (next > now) {
delay(next - now)
emit(Unit)
lastAwakeAt = next
} else {
val ticks: Long = (now - lastAwakeAt - 1) / period
when {
ticks < 0L -> Unit
ticks == 0L || dampen -> emit(Unit)
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
else -> repeat(ticks.toInt()) { emit(Unit) }
}
lastAwakeAt += (period * ticks)
}
}
}

public inline fun <A, B> Flow<A>.mapIndexed(crossinline f: suspend (Int, A) -> B): Flow<B> {
var index = 0
return map { value ->
f(index++, value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ import io.kotest.matchers.longs.shouldBeLessThan
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import kotlinx.coroutines.flow.collect
import io.kotest.property.arbitrary.map
import io.kotest.property.arbitrary.positiveInts
import kotlin.time.Duration
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toSet
import kotlinx.coroutines.test.runBlockingTest
import kotlin.time.ExperimentalTime
import kotlin.time.milliseconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.withTimeoutOrNull

@ExperimentalTime
class FlowJvmTest : ArrowFxSpec(spec = {
Expand Down Expand Up @@ -59,4 +67,91 @@ class FlowJvmTest : ArrowFxSpec(spec = {
}
}
}

"fixedDelay" {
runBlockingTest {
checkAll(Arb.positiveInts().map(Int::toLong), Arb.int(1..100)) { waitPeriod, n ->
val emissionDuration = waitPeriod / 10L
var state: Long? = null

val rate = flow { emit(delay(Duration.milliseconds(waitPeriod))) }.repeat()
.map {
val now = state ?: currentTime
val nextNow = currentTime
val lapsed = nextNow - now
state = nextNow
delay(emissionDuration)
lapsed
}
.take(n)
.toList()

rate.first() shouldBe 0 // First element is immediately
rate.drop(1).forEach { act ->
act shouldBe (waitPeriod + emissionDuration) // Remaining elements all take delay + emission duration
}
}
}
}

"fixedRate" {
runBlockingTest {
checkAll(Arb.positiveInts().map(Int::toLong), Arb.int(1..100)) { waitPeriod, n ->
val emissionDuration = waitPeriod / 10
var state: Long? = null

val rate = fixedRate(Duration.milliseconds(waitPeriod)) { currentTime }
.map {
val now = state ?: currentTime
val nextNow = currentTime
val lapsed = nextNow - now
state = nextNow
delay(emissionDuration)
lapsed
}
.take(n)
.toList()

rate.first() shouldBe 0 // First element is immediately
rate.drop(1).forEach { act ->
// Remaining elements all take total of waitPeriod, emissionDuration is correctly taken into account.
act shouldBe waitPeriod
}
}
}
}

"fixedRate(dampen = true)" {
val waitPeriod = 1000L
val n = 3
val timeout = (n + 1) * waitPeriod + 500
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
val buffer = mutableListOf<Unit>()

withTimeoutOrNull(timeout) {
fixedRate(Duration.milliseconds(waitPeriod), true) { timeInMillis() }
.mapIndexed { index, _ ->
if (index == 0) delay(waitPeriod * n) else Unit
}
.collect(buffer::add)
}

buffer.size shouldBe 2
}

"fixedRate(dampen = false)" {
val waitPeriod = 1000L
val n = 3
val timeout = (n + 1) * waitPeriod + 500
val buffer = mutableListOf<Unit>()

withTimeoutOrNull(timeout) {
fixedRate(Duration.milliseconds(waitPeriod), false) { timeInMillis() }
.mapIndexed { index, _ ->
if (index == 0) delay(waitPeriod * n) else Unit
}
.collect(buffer::add)
}

buffer.size shouldBe n + 1
}
})