Skip to content

Commit abef2ed

Browse files
feat: implement last and lastOption operators
The `lastOption` operator returns the last element in `Source` wrapped in `Some` or `None` in case when source is empty or failed. Note that this is a terminal operation for source e.g.: Source.empty[Int].lastOption() // None val s = Source.fromValues(1, 2) s.lastOption() // Some(2) s.receive() // ChannelClosed.Done The `last` operator returns the last element in `Source` or throws `NoSuchElementException` in case when it is empty. In case when `receive()` fails then `ChannelClosedException.Error` exception is thrown. It is also a terminal operation e.g.: Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last from an empty source") val s = Source.fromValues(1, 2) s.last() // 2 s.receive() // ChannelClosed.Done Note that ChannelClosedException.Error was improved to contain `cause` exception (if available).
1 parent e28a268 commit abef2ed

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

core/src/main/scala/ox/channels/ChannelClosed.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ object ChannelClosed:
88
case class Error(reason: Option[Throwable]) extends ChannelClosed
99
case object Done extends ChannelClosed
1010

11-
enum ChannelClosedException(reason: Option[Throwable]) extends Exception:
11+
enum ChannelClosedException(reason: Option[Throwable]) extends Exception(reason.orNull):
1212
case Error(reason: Option[Throwable]) extends ChannelClosedException(reason)
1313
case Done() extends ChannelClosedException(None)

core/src/main/scala/ox/channels/SourceOps.scala

+55
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,61 @@ trait SourceOps[+T] { this: Source[T] =>
565565
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
566566
case t: T @unchecked => t
567567
}
568+
569+
/** Returns the last element from this source wrapped in `Some` or `None` when the source is empty or fails during the `receive()``
570+
* operation. Note that `lastOption` is a terminal operation leaving the source in `ChannelClosed.Done` state.
571+
*
572+
* @return
573+
* A `Some(last element)` if source is not empty or None` otherwise.
574+
* @example
575+
* {{{
576+
* import ox.*
577+
* import ox.channels.Source
578+
*
579+
* supervised {
580+
* Source.empty[Int].lastOption() // None
581+
* val s = Source.fromValues(1, 2)
582+
* s.lastOption() // Some(2)
583+
* s.receive() // ChannelClosed.Done
584+
* }
585+
* }}}
586+
*/
587+
def lastOption(): Option[T] = Try(last()).toOption
588+
589+
/** Returns the last element from this source or throws `NoSuchElementException` when the source is empty. In case when the `receive()`
590+
* operation fails then `ChannelClosedException.Error` exception is thrown. Note that `last` is a terminal operation leaving the source
591+
* in `ChannelClosed.Done` state.
592+
*
593+
* @return
594+
* A last element if source is not empty or throws otherwise.
595+
* @throws NoSuchElementException
596+
* When source is empty or `receive()` failed without error.
597+
* @throws ChannelClosedException.Error
598+
* When `receive()` fails then this exception is thrown.
599+
* @example
600+
* {{{
601+
* import ox.*
602+
* import ox.channels.Source
603+
*
604+
* supervised {
605+
* Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last from an empty source")
606+
* val s = Source.fromValues(1, 2)
607+
* s.last() // 2
608+
* s.receive() // ChannelClosed.Done
609+
* }
610+
* }}}
611+
*/
612+
def last(): T =
613+
supervised {
614+
var value: Option[T] = None
615+
repeatUntil {
616+
receive() match
617+
case ChannelClosed.Done => true
618+
case e: ChannelClosed.Error => throw e.toThrowable
619+
case t: T @unchecked => value = Some(t); false
620+
}
621+
value.getOrElse(throw new NoSuchElementException("cannot obtain last from an empty source"))
622+
}
568623
}
569624

570625
trait SourceCompanionOps:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package ox.channels
2+
3+
import org.scalatest.OptionValues
4+
import org.scalatest.flatspec.AnyFlatSpec
5+
import org.scalatest.matchers.should.Matchers
6+
import ox.*
7+
8+
class SourceOpsLastOptionTest extends AnyFlatSpec with Matchers with OptionValues {
9+
behavior of "SourceOps.lastOption"
10+
11+
it should "return None for the empty source" in supervised {
12+
Source.empty[Int].lastOption() shouldBe None
13+
}
14+
15+
it should "return None for the failed source" in supervised {
16+
Source
17+
.failed(new RuntimeException("source is broken"))
18+
.lastOption() shouldBe None
19+
}
20+
21+
it should "return last element wrapped in Some for the non-empty source" in supervised {
22+
Source.fromValues(1, 2).lastOption().value shouldBe 2
23+
}
24+
25+
it should "drain the source" in supervised {
26+
val s = Source.fromValues(1)
27+
s.lastOption().value shouldBe 1
28+
s.receive() shouldBe ChannelClosed.Done
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ox.channels
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import ox.*
6+
7+
class SourceOpsLastTest extends AnyFlatSpec with Matchers {
8+
behavior of "SourceOps.last"
9+
10+
it should "throw NoSuchElementException for the empty source" in supervised {
11+
the[NoSuchElementException] thrownBy {
12+
Source.empty[Int].last()
13+
} should have message "cannot obtain last from an empty source"
14+
}
15+
16+
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
17+
the[ChannelClosedException.Error] thrownBy {
18+
Source
19+
.failed(new RuntimeException("source is broken"))
20+
.last()
21+
} should have message "java.lang.RuntimeException: source is broken"
22+
}
23+
24+
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
25+
the[ChannelClosedException.Error] thrownBy {
26+
Source.failedWithoutReason[Int]().last()
27+
}
28+
}
29+
30+
it should "return last element for the non-empty source" in supervised {
31+
Source.fromValues(1, 2).last() shouldBe 2
32+
}
33+
34+
it should "drain the source" in supervised {
35+
val s = Source.fromValues(1)
36+
s.last() shouldBe 1
37+
s.receive() shouldBe ChannelClosed.Done
38+
}
39+
}

0 commit comments

Comments
 (0)