Skip to content

Commit 0b4455e

Browse files
feat: implement takeWhile function
Sends elements to the returned channel until predicate is satisfied. Note that if the predicate fails then subsequent elements are not longer taken even if they could still satisfy it. Example: Source.empty[Int].takeWhile(_ > 3).toList // List() Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2) Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
1 parent 161557a commit 0b4455e

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

core/src/main/scala/ox/channels/SourceOps.scala

+20-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package ox.channels
22

33
import ox.*
44

5-
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
5+
import java.util.concurrent.{CountDownLatch, Semaphore}
66
import scala.collection.mutable
77
import scala.concurrent.duration.FiniteDuration
88

@@ -141,6 +141,25 @@ trait SourceOps[+T] { this: Source[T] =>
141141

142142
def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))
143143

144+
/** Sends elements to the returned channel until predicate `f` is satisfied (returns `true`). Note that when the predicate `f` is not
145+
* satisfied (returns `false`), subsequent elements are dropped even if they could still satisfy it.
146+
*
147+
* @param f
148+
* A predicate function.
149+
* @example
150+
* {{{
151+
* import ox.*
152+
* import ox.channels.Source
153+
*
154+
* scoped {
155+
* Source.empty[Int].takeWhile(_ > 3).toList // List()
156+
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
157+
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
158+
* }
159+
* }}}
160+
*/
161+
def takeWhile(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.takeWhile(f))
162+
144163
def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f))
145164

146165
def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package ox.channels
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import ox.*
6+
7+
class SourceOpsTakeWhileTest extends AnyFlatSpec with Matchers {
8+
behavior of "Source.takeWhile"
9+
10+
it should "not take from the empty source" in supervised {
11+
val s = Source.empty[Int]
12+
s.takeWhile(_ < 3).toList shouldBe List.empty
13+
}
14+
15+
it should "take as long as predicate is satisfied" in supervised {
16+
val s = Source.fromValues(1, 2, 3)
17+
s.takeWhile(_ < 3).toList shouldBe List(1, 2)
18+
}
19+
20+
it should "not take if predicate fails for first or more elements" in supervised {
21+
val s = Source.fromValues(3, 2, 1)
22+
s.takeWhile(_ < 3).toList shouldBe List()
23+
}
24+
}

0 commit comments

Comments
 (0)