Commit 2740fe7 1 parent 2e33ffc commit 2740fe7 Copy full SHA for 2740fe7
File tree 2 files changed +75
-0
lines changed
2 files changed +75
-0
lines changed Original file line number Diff line number Diff line change @@ -623,6 +623,42 @@ trait SourceOps[+T] { this: Source[T] =>
623
623
* }}}
624
624
*/
625
625
def last (): T = lastOption().getOrElse(throw new NoSuchElementException (" cannot obtain last from an empty source" ))
626
+
627
+ /** Uses `zero` as the current value and applies function `f` on it and a value received from a source. The returned value is used as the
628
+ * next current value and `f` is applied again with the value received from a source. The operation is repeated until the source is
629
+ * drained.
630
+ *
631
+ * @param zero
632
+ * An initial value to be used as the first argument to function `f` call.
633
+ * @param f
634
+ * A binary function (a function that takes two arguments) that is applied to the current value and value received from a source.
635
+ * @return
636
+ * Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call
637
+ * is used as an input value to the next.
638
+ * @throws ChannelClosedException.Error
639
+ * When `receive()` fails.
640
+ * @example
641
+ * {{{
642
+ * import ox.*
643
+ * import ox.channels.Source
644
+ *
645
+ * supervised {
646
+ * Source.empty[Int].fold(0)((acc, n) => acc + n) // 0
647
+ * Source.fromValues(2, 3).fold(5)((acc, n) => acc - n) // 0
648
+ * }
649
+ * }}}
650
+ */
651
+ def fold [U ](zero : U )(f : (U , T ) => U ): U =
652
+ supervised {
653
+ var current = zero
654
+ repeatWhile {
655
+ receive() match
656
+ case ChannelClosed .Done => false
657
+ case e : ChannelClosed .Error => throw e.toThrowable
658
+ case t : T @ unchecked => current = f(current, t); true
659
+ }
660
+ current
661
+ }
626
662
}
627
663
628
664
trait SourceCompanionOps :
Original file line number Diff line number Diff line change
1
+ package ox .channels
2
+
3
+ import org .scalatest .flatspec .AnyFlatSpec
4
+ import org .scalatest .matchers .should .Matchers
5
+ import ox .*
6
+
7
+ class SourceOpsFoldTest extends AnyFlatSpec with Matchers {
8
+ behavior of " Source.fold"
9
+
10
+ it should " throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
11
+ the[ChannelClosedException .Error ] thrownBy {
12
+ Source
13
+ .failed[Int ](new RuntimeException (" source is broken" ))
14
+ .fold(0 )((acc, n) => acc + n)
15
+ } should have message " java.lang.RuntimeException: source is broken"
16
+ }
17
+
18
+ it should " throw ChannelClosedException.Error for source failed without exception" in supervised {
19
+ the[ChannelClosedException .Error ] thrownBy {
20
+ Source
21
+ .failedWithoutReason[Int ]()
22
+ .fold(0 )((acc, n) => acc + n)
23
+ }
24
+ }
25
+
26
+ it should " return `zero` value from fold on the empty source" in supervised {
27
+ Source .empty[Int ].fold(0 )((acc, n) => acc + n) shouldBe 0
28
+ }
29
+
30
+ it should " return fold on non-empty source" in supervised {
31
+ Source .fromValues(1 , 2 ).fold(0 )((acc, n) => acc + n) shouldBe 3
32
+ }
33
+
34
+ it should " drain the source" in supervised {
35
+ val s = Source .fromValues(1 )
36
+ s.fold(0 )((acc, n) => acc + n) shouldBe 1
37
+ s.receive() shouldBe ChannelClosed .Done
38
+ }
39
+ }
You can’t perform that action at this time.
0 commit comments