Commit 5b42d1b 1 parent abef2ed commit 5b42d1b Copy full SHA for 5b42d1b
File tree 2 files changed +75
-0
lines changed
2 files changed +75
-0
lines changed Original file line number Diff line number Diff line change @@ -620,6 +620,42 @@ trait SourceOps[+T] { this: Source[T] =>
620
620
}
621
621
value.getOrElse(throw new NoSuchElementException (" cannot obtain last from an empty source" ))
622
622
}
623
+
624
+ /** Uses `zero` as the current value and applies function `f` on it and a value received from a source. The returned value is used as the
625
+ * next current value and `f` is applied again with the value received from a source. The operation is repeated until the source is
626
+ * drained.
627
+ *
628
+ * @param zero
629
+ * An initial value to be used as the first argument to function `f` call.
630
+ * @param f
631
+ * A binary function (a function that takes two arguments) that is applied to the current value and value received from a source.
632
+ * @return
633
+ * Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call
634
+ * is used as an input value to the next.
635
+ * @throws ChannelClosedException.Error
636
+ * When `receive()` fails then this exception is thrown.
637
+ * @example
638
+ * {{{
639
+ * import ox.*
640
+ * import ox.channels.Source
641
+ *
642
+ * supervised {
643
+ * Source.empty[Int].fold(0)((acc, n) => acc + n) // 0
644
+ * Source.fromValues(2, 3).fold(5)((acc, n) => acc - n) // 0
645
+ * }
646
+ * }}}
647
+ */
648
+ def fold [U ](zero : U )(f : (U , T ) => U ): U =
649
+ supervised {
650
+ var current = zero
651
+ repeatWhile {
652
+ receive() match
653
+ case ChannelClosed .Done => false
654
+ case e : ChannelClosed .Error => throw e.toThrowable
655
+ case t : T @ unchecked => current = f(current, t); true
656
+ }
657
+ current
658
+ }
623
659
}
624
660
625
661
trait SourceCompanionOps :
Original file line number Diff line number Diff line change
1
+ package ox .channels
2
+
3
+ import org .scalatest .flatspec .AnyFlatSpec
4
+ import org .scalatest .matchers .should .Matchers
5
+ import ox .*
6
+
7
+ class SourceOpsFoldTest extends AnyFlatSpec with Matchers {
8
+ behavior of " Source.fold"
9
+
10
+ it should " throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
11
+ the[ChannelClosedException .Error ] thrownBy {
12
+ Source
13
+ .failed[Int ](new RuntimeException (" source is broken" ))
14
+ .fold(0 )((acc, n) => acc + n)
15
+ } should have message " java.lang.RuntimeException: source is broken"
16
+ }
17
+
18
+ it should " throw NoSuchElementException for source failed without exception" in supervised {
19
+ the[ChannelClosedException .Error ] thrownBy {
20
+ Source
21
+ .failedWithoutReason[Int ]()
22
+ .fold(0 )((acc, n) => acc + n)
23
+ }
24
+ }
25
+
26
+ it should " return `zero` value from fold on the empty source" in supervised {
27
+ Source .empty[Int ].fold(0 )((acc, n) => acc + n) shouldBe 0
28
+ }
29
+
30
+ it should " return fold on non-empty source" in supervised {
31
+ Source .fromValues(1 , 2 ).fold(0 )((acc, n) => acc + n) shouldBe 3
32
+ }
33
+
34
+ it should " drain the source" in supervised {
35
+ val s = Source .fromValues(1 )
36
+ s.fold(0 )((acc, n) => acc + n) shouldBe 1
37
+ s.receive() shouldBe ChannelClosed .Done
38
+ }
39
+ }
You can’t perform that action at this time.
0 commit comments