Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlowOps: sample operator #254

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions core/src/main/scala/ox/flow/FlowOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ class FlowOps[+T]:
def filter(f: T => Boolean): Flow[T] = Flow.usingEmitInline: emit =>
last.run(FlowEmit.fromInline(t => if f(t) then emit.apply(t)))

/** Emits only every nth element emitted by this flow.
*
* @param n
* The interval between two emitted elements.
*/
def sample(n: Int): Flow[T] = Flow.usingEmitInline: emit =>
var sampleCounter = 0
last.run(
FlowEmit.fromInline: t =>
sampleCounter += 1
if n != 0 && sampleCounter % n == 0 then emit(t)
)

/** Applies the given mapping function `f` to each element emitted by this flow, for which the function is defined, and emits the result.
* If `f` is not defined at an element, the element will be skipped.
*
Expand All @@ -82,6 +95,23 @@ class FlowOps[+T]:
def collect[U](f: PartialFunction[T, U]): Flow[U] = Flow.usingEmitInline: emit =>
last.run(FlowEmit.fromInline(t => if f.isDefinedAt(t) then emit.apply(f(t))))

/** Transforms the elements of the flow by applying an accumulation function to each element, producing a new value at each step. The
* resulting flow contains the accumulated values at each point in the original flow.
*
* @param initial
* The initial value to start the accumulation.
* @param f
* The accumulation function that is applied to each element of the flow.
*/
def scan[V](initial: V)(f: (V, T) => V): Flow[V] = Flow.usingEmitInline: emit =>
emit(initial)
var accumulator = initial
last.run(
FlowEmit.fromInline: t =>
accumulator = f(accumulator, t)
emit(accumulator)
)

/** Applies the given effectful function `f` to each element emitted by this flow. The returned flow emits the elements unchanged. If `f`
* throws an exceptions, the flow fails and propagates the exception.
*/
Expand Down Expand Up @@ -441,6 +471,17 @@ class FlowOps[+T]:
emit((t, otherDefault)); true
)

/** Combines each element from this and the index of the element (starting at 0).
*/
def zipWithIndex: Flow[(T, Long)] = Flow.usingEmitInline: emit =>
var index = 0L
last.run(
FlowEmit.fromInline: t =>
val zipped = (t, index)
index += 1
emit(zipped)
)

/** Emits a given number of elements (determined byc `segmentSize`) from this flow to the returned flow, then emits the same number of
* elements from the `other` flow and repeats. The order of elements in both flows is preserved.
*
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsFilterTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsFilterTest extends AnyFlatSpec with Matchers:
behavior of "filter"

it should "not filter anything from the empty flow" in:
val c = Flow.empty[Int]
val s = c.filter(_ % 2 == 0)
s.runToList() shouldBe List.empty

it should "filter out everything if no element meets 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ => false)
s.runToList() shouldBe List.empty

it should "not filter anything if all the elements meet 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ => true)
s.runToList() shouldBe (1 to 10)

it should "filter out elements that don't meet 'f'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.filter(_ % 2 == 0)
s.runToList() shouldBe (2 to 10 by 2)

end FlowOpsFilterTest
32 changes: 32 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsSampleTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsSampleTest extends AnyFlatSpec with Matchers:
behavior of "sample"

it should "not sample anything from an empty flow" in:
val c = Flow.empty[Int]
val s = c.sample(5)
s.runToList() shouldBe List.empty

it should "not sample anything when 'n == 0'" in:
val c = Flow.fromValues(1 to 10: _*)
val s = c.sample(0)
s.runToList() shouldBe List.empty

it should "sample every element of the flow when 'n == 1'" in:
val c = Flow.fromValues(1 to 10: _*)
val n = 1
val s = c.sample(n)
s.runToList() shouldBe (n to 10 by n)

it should "sample every nth element of the flow" in:
val c = Flow.fromValues(1 to 10: _*)
val n = 3
val s = c.sample(n)
s.runToList() shouldBe (n to 10 by n)

end FlowOpsSampleTest
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsScanTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class FlowOpsScanTest extends AnyFlatSpec with Matchers:
behavior of "scan"

it should "scan the empty flow" in:
val flow: Flow[Int] = Flow.empty
val scannedFlow = flow.scan(0)((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List(0)

it should "scan a flow of summed Int" in:
val flow = Flow.fromValues(1 to 10: _*)
val scannedFlow = flow.scan(0)((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List(0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55)

it should "scan a flow of multiplied Int" in:
val flow = Flow.fromValues(1 to 10: _*)
val scannedFlow = flow.scan(1)((acc, el) => acc * el)
scannedFlow.runToList() shouldBe List(1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800)

it should "scan a flow of concatenated String" in:
val flow = Flow.fromValues("f", "l", "o", "w")
val scannedFlow = flow.scan("my")((acc, el) => acc + el)
scannedFlow.runToList() shouldBe List("my", "myf", "myfl", "myflo", "myflow")

end FlowOpsScanTest
20 changes: 20 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsZipWithIndexTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ox.flow

import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class FlowOpsZipWithIndexTest extends AnyFlatSpec with Matchers with Eventually:
behavior of "zipWithIndex"

it should "not zip anything from an empty flow" in:
val c = Flow.empty[Int]
val s = c.zipWithIndex
s.runToList() shouldBe List.empty

it should "zip flow with index" in:
val c = Flow.fromValues(1 to 5: _*)
val s = c.zipWithIndex
s.runToList() shouldBe List((1, 0), (2, 1), (3, 2), (4, 3), (5, 4))

end FlowOpsZipWithIndexTest