Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fixedRate using monotonic time source #3294

Merged
merged 3 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ public final class arrow/fx/coroutines/ParZipOrAccumulateKt {
public static final fun parZipOrAccumulate (Larrow/core/raise/Raise;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function4;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/PredefKt {
public static final fun timeInMillis ()J
}

public final class arrow/fx/coroutines/Race2Kt {
public static final fun raceN (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun raceN (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.retryWhen
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.produceIn
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.zip
import kotlin.time.ComparableTimeMark
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime
import kotlin.time.TimeSource

/**
* Like [map], but will evaluate [transform] in parallel, emitting the results
Expand Down Expand Up @@ -209,16 +211,17 @@ public fun <A> Flow<A>.repeat(): Flow<A> =
public fun <A> Flow<A>.metered(period: Duration): Flow<A> =
fixedRate(period).zip(this) { _, a -> a }

@ExperimentalTime
public fun <A> Flow<A>.metered(period: Long): Flow<A> =
fixedRate(period).zip(this) { _, a -> a }

@ExperimentalTime
public fun fixedRate(
period: Duration,
periodInMillis: Long,
dampen: Boolean = true,
timeStampInMillis: () -> Long = { timeInMillis() }
timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any benefit to taking TimeSource here instead? Or do we need Monotonic for ComparableTimeMark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need ComparableTimeMark because comparison is used in at least one place. Also, the deprecation warnings from timeInMillis mentioned that this was the expected substitution, so I just went with it.

): Flow<Unit> =
fixedRate(period.inWholeMilliseconds, dampen, timeStampInMillis)
fixedRate(periodInMillis.milliseconds, dampen, timeStamp)

/**
* Flow that emits [Unit] every [period] while taking into account how much time it takes downstream to consume the emission.
Expand All @@ -234,33 +237,33 @@ public fun fixedRate(
*
* @param period period between [Unit] emits of the resulting [Flow].
* @param dampen if you set [dampen] to false it will send `n` times [period] time it took downstream to process the emission.
* @param timeStampInMillis allows for supplying a different timestamp function, useful to override with `runBlockingTest`
* @param timeStamp allows for supplying a different timestamp function, useful to override with `runBlockingTest`
*/
public fun fixedRate(
period: Long,
period: Duration,
dampen: Boolean = true,
timeStampInMillis: () -> Long = { timeInMillis() }
timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }
): Flow<Unit> =
if (period == 0L) flowOf(Unit).repeat()
if (period == Duration.ZERO) flowOf(Unit).repeat()
else flow {
var lastAwakeAt = timeStampInMillis()
var lastAwakeAt = timeStamp()

while (true) {
val now = timeStampInMillis()
val now = timeStamp()
val next = lastAwakeAt + period

if (next > now) {
delay(next - now)
emit(Unit)
lastAwakeAt = next
} else {
val ticks: Long = (now - lastAwakeAt - 1) / period
val ticks: Long = ((now - lastAwakeAt).inWholeMilliseconds - 1) / period.inWholeMilliseconds
when {
ticks < 0L -> Unit
ticks == 0L || dampen -> emit(Unit)
else -> repeat(ticks.toInt()) { emit(Unit) }
}
lastAwakeAt += (period * ticks)
lastAwakeAt += (period * ticks.toDouble())
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,31 @@ import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.long
import io.kotest.property.checkAll
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toSet
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.testTimeSource
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.test.Test
import kotlin.time.ComparableTimeMark
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime

@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
class FlowTest {
Expand Down Expand Up @@ -240,18 +253,17 @@ class FlowTest {
)
}

/* These tests do not run properly in the coroutines-test environment

@Test
fun fixedDelay() = runTestUsingDefaultDispatcher {
checkAll(Arb.long(10L .. 50L), Arb.int(3..20)) { waitPeriod, n ->
val emissionDuration = waitPeriod / 10L
var state: Long? = null
@Test @ExperimentalTime
fun fixedDelay() = runTest {
checkAll(Arb.long(10L .. 50L), Arb.int(3..20)) { waitPeriodInMillis, n ->
val waitPeriod = waitPeriodInMillis.milliseconds
val emissionDuration = (waitPeriodInMillis / 10L).milliseconds
var state: ComparableTimeMark? = null

val rate = flow { emit(delay(waitPeriod)) }.repeat()
.map {
val now = state ?: currentTime
val nextNow = currentTime
val now = state ?: testTimeSource.markNow()
val nextNow = testTimeSource.markNow()
val lapsed = nextNow - now
state = nextNow
delay(emissionDuration)
Expand All @@ -260,23 +272,24 @@ class FlowTest {
.take(n)
.toList()

rate.first() shouldBe 0 // First element is immediately
rate.first() shouldBe Duration.ZERO // First element is immediately
rate.drop(1).forEach { act ->
act shouldBe (waitPeriod + emissionDuration) // Remaining elements all take delay + emission duration
}
}
}

@Test
fun fixedRate() = runTestUsingDefaultDispatcher {
checkAll(Arb.long(10L..50L), Arb.int(3..20)) { waitPeriod, n ->
val emissionDuration = waitPeriod / 10
var state: Long? = null
@Test @ExperimentalTime
fun fixedRate() = runTest {
checkAll(Arb.long(10L..50L), Arb.int(3..20)) { waitPeriodInMillis, n ->
val waitPeriod = waitPeriodInMillis.milliseconds
val emissionDuration = (waitPeriodInMillis / 10L).milliseconds
var state: ComparableTimeMark? = null

val rate = fixedRate(waitPeriod) { currentTime }
val rate = fixedRate(waitPeriod) { testTimeSource.markNow() }
.map {
val now = state ?: currentTime
val nextNow = currentTime
val now = state ?: testTimeSource.markNow()
val nextNow = testTimeSource.markNow()
val lapsed = nextNow - now
state = nextNow
delay(emissionDuration)
Expand All @@ -285,7 +298,7 @@ class FlowTest {
.take(n)
.toList()

rate.first() shouldBe 0 // First element is immediately
rate.first() shouldBe Duration.ZERO // First element is immediately
rate.drop(1).forEach { act ->
// Remaining elements all take total of waitPeriod, emissionDuration is correctly taken into account.
act shouldBe waitPeriod
Expand All @@ -294,11 +307,11 @@ class FlowTest {
}


@Test
fun fixedRateWithDampenTrue() = runTestUsingDefaultDispatcher {
@Test @ExperimentalTime
fun fixedRateWithDampenTrue() = runTest {
val buffer = mutableListOf<Unit>()
withTimeoutOrNull(4500) {
fixedRate(1000, true) { currentTime }
fixedRate(1000, true) { testTimeSource.markNow() }
.mapIndexed { index, _ ->
if (index == 0) delay(3000) else Unit
advanceTimeBy(1)
Expand All @@ -307,17 +320,16 @@ class FlowTest {
buffer.size shouldBe 2
}

@Test
fun fixedRateWithDampenFalse() = runTestUsingDefaultDispatcher {
@Test @ExperimentalTime
fun fixedRateWithDampenFalse() = runTest {
val buffer = mutableListOf<Unit>()
withTimeoutOrNull(4500) {
fixedRate(1000, false) { currentTime }
fixedRate(1000, false) { testTimeSource.markNow() }
.mapIndexed { index, _ ->
if (index == 0) delay(3000) else Unit
advanceTimeBy(1)
}.collect(buffer::add)
}
buffer.size shouldBe 4
}
*/
}

This file was deleted.

This file was deleted.

This file was deleted.