Skip to content

Commit 4bef471

Browse files
feat: implement fold operator
The `fold` operation returns combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call is used as an input value to the next e.g.: Source.empty[Int].fold(0)((acc, n) => acc + n) // 0 Source.fromValues(2, 3).fold(5)((acc, n) => acc - n) // 0 Note that in case when `receive()` operation fails then: * the original exception is re-thrown * `NoSuchElement` exception is thrown when source fails without error
1 parent e28a268 commit 4bef471

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

core/src/main/scala/ox/channels/SourceOps.scala

+38
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,44 @@ trait SourceOps[+T] { this: Source[T] =>
565565
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
566566
case t: T @unchecked => t
567567
}
568+
569+
/** Uses `zero` as the current value and applies function `f` on it and a value received from a source. The returned value is used as the
570+
* next current value and `f` is applied again with the value received from a source. The operation is repeated until the source is
571+
* drained.
572+
*
573+
* @param zero
574+
* An initial value to be used as the first argument to function `f` call.
575+
* @param f
576+
* A binary function (a function that takes two arguments) that is applied to the current value and value received from a source.
577+
* @return
578+
* Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call
579+
* is used as an input value to the next.
580+
* @throws NoSuchElementException
581+
* When `receive()` failed without error.
582+
* @throws exception
583+
* When `receive()` failed with exception then this exception is re-thrown.
584+
* @example
585+
* {{{
586+
* import ox.*
587+
* import ox.channels.Source
588+
*
589+
* scoped {
590+
* Source.empty[Int].fold(0)((acc, n) => acc + n) // 0
591+
* Source.fromValues(2, 3).fold(5)((acc, n) => acc - n) // 0
592+
* }
593+
* }}}
594+
*/
595+
def fold[U](zero: U)(f: (U, T) => U): U =
596+
supervised {
597+
var current = zero
598+
repeatWhile {
599+
receive() match
600+
case ChannelClosed.Done => false
601+
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("folding failed"))
602+
case t: T @unchecked => current = f(current, t); true
603+
}
604+
current
605+
}
568606
}
569607

570608
trait SourceCompanionOps:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ox.channels
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import ox.*
6+
7+
class SourceOpsFoldTest extends AnyFlatSpec with Matchers {
8+
behavior of "Source.fold"
9+
10+
it should "re-throw exception that was thrown during fold performance" in supervised {
11+
the[RuntimeException] thrownBy {
12+
Source
13+
.failed[Int](new RuntimeException("source is broken"))
14+
.fold(0)((acc, n) => acc + n)
15+
} should have message "source is broken"
16+
}
17+
18+
it should "throw NoSuchElementException for source failed without exception" in supervised {
19+
the[NoSuchElementException] thrownBy {
20+
Source
21+
.failedWithoutReason[Int]()
22+
.fold(0)((acc, n) => acc + n)
23+
} should have message "folding failed"
24+
}
25+
26+
it should "return `zero` value from fold on the empty source" in supervised {
27+
Source.empty[Int].fold(0)((acc, n) => acc + n) shouldBe 0
28+
}
29+
30+
it should "return fold on non-empty source" in supervised {
31+
Source.fromValues(1, 2).fold(0)((acc, n) => acc + n) shouldBe 3
32+
}
33+
34+
it should "drain the source" in supervised {
35+
val s = Source.fromValues(1)
36+
s.fold(0)((acc, n) => acc + n) shouldBe 1
37+
s.receive() shouldBe ChannelClosed.Done
38+
}
39+
}

0 commit comments

Comments
 (0)