Skip to content

Commit 053c486

Browse files
feat: introduce takeLast(n) operator (#31)
Returns the list of up to `n` last elements from this source. Less than `n` elements is returned when this source contains les elements than requested. The [[List.empty]] is returned when `takeLast` is called on an empty source. Example: Source.empty[Int].takeLast(5) // List.empty Source.fromValues(1).takeLast(0) // List.empty Source.fromValues(1).takeLast(2) // List(1) val s = Source.fromValues(1, 2, 3, 4) s.takeLast(2) // List(4, 5) s.receive() // ChannelClosed.Done
1 parent c7710e2 commit 053c486

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

core/src/main/scala/ox/channels/SourceOps.scala

+46
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ox.channels
22

33
import ox.*
44

5+
import java.util
56
import java.util.concurrent.{CountDownLatch, Semaphore}
67
import scala.collection.{IterableOnce, mutable}
78
import scala.concurrent.duration.FiniteDuration
@@ -731,6 +732,51 @@ trait SourceOps[+T] { this: Source[T] =>
731732
*/
732733
def reduce[U >: T](f: (U, U) => U): U =
733734
fold(headOption().getOrElse(throw new NoSuchElementException("cannot reduce an empty source")))(f)
735+
736+
/** Returns the list of up to `n` last elements from this source. Less than `n` elements is returned when this source contains less
737+
* elements than requested. The [[List.empty]] is returned when `takeLast` is called on an empty source.
738+
*
739+
* @param n
740+
* Number of elements to be taken from the end of this source. It is expected that `n >= 0`.
741+
* @return
742+
* A list of up to `n` last elements from this source.
743+
* @throws ChannelClosedException.Error
744+
* When receiving an element from this source fails.
745+
* @example
746+
* {{{
747+
* import ox.*
748+
* import ox.channels.Source
749+
*
750+
* supervised {
751+
* Source.empty[Int].takeLast(5) // List.empty
752+
* Source.fromValues(1).takeLast(0) // List.empty
753+
* Source.fromValues(1).takeLast(2) // List(1)
754+
* val s = Source.fromValues(1, 2, 3, 4)
755+
* s.takeLast(2) // List(4, 5)
756+
* s.receive() // ChannelClosed.Done
757+
* }
758+
* }}}
759+
*/
760+
def takeLast(n: Int): List[T] =
761+
require(n >= 0, "n must be >= 0")
762+
if (n == 0)
763+
drain()
764+
List.empty
765+
else if (n == 1) lastOption().map(List(_)).getOrElse(List.empty)
766+
else
767+
supervised {
768+
val buffer: mutable.ListBuffer[T] = mutable.ListBuffer()
769+
buffer.sizeHint(n)
770+
repeatWhile {
771+
receive() match
772+
case ChannelClosed.Done => false
773+
case e: ChannelClosed.Error => throw e.toThrowable
774+
case t: T @unchecked =>
775+
if (buffer.size == n) buffer.dropInPlace(1)
776+
buffer.append(t); true
777+
}
778+
buffer.result()
779+
}
734780
}
735781

736782
trait SourceCompanionOps:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package ox.channels
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import ox.*
6+
7+
class SourceOpsTakeLastTest extends AnyFlatSpec with Matchers {
8+
behavior of "SourceOps.takeLast"
9+
10+
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
11+
the[ChannelClosedException.Error] thrownBy {
12+
Source
13+
.failed[Int](new RuntimeException("source is broken"))
14+
.takeLast(1)
15+
} should have message "java.lang.RuntimeException: source is broken"
16+
}
17+
18+
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
19+
the[ChannelClosedException.Error] thrownBy {
20+
Source
21+
.failedWithoutReason[Int]()
22+
.takeLast(1)
23+
}
24+
}
25+
26+
it should "fail to takeLast when n < 0" in supervised {
27+
the[IllegalArgumentException] thrownBy {
28+
Source.empty[Int].takeLast(-1)
29+
} should have message "requirement failed: n must be >= 0"
30+
}
31+
32+
it should "return empty list for the empty source" in supervised {
33+
Source.empty[Int].takeLast(1) shouldBe List.empty
34+
}
35+
36+
it should "return empty list when n == 0 and list is not empty" in supervised {
37+
Source.fromValues(1).takeLast(0) shouldBe List.empty
38+
}
39+
40+
it should "return list with all elements if the source is smaller than requested number" in supervised {
41+
Source.fromValues(1, 2).takeLast(3) shouldBe List(1, 2)
42+
}
43+
44+
it should "return the last n elements from the source" in supervised {
45+
Source.fromValues(1, 2, 3, 4, 5).takeLast(2) shouldBe List(4, 5)
46+
}
47+
48+
it should "drain the source" in supervised {
49+
val s = Source.fromValues(1)
50+
s.takeLast(1) shouldBe List(1)
51+
s.receive() shouldBe ChannelClosed.Done
52+
}
53+
}

0 commit comments

Comments
 (0)