Skip to content

Commit d60ce81

Browse files
feat: implement last and lastOption operators
The `lastOption` operator returns the last element in `Source` wrapped in `Some` or `None` in case when source is empty. Note that this is a terminal operation for source e.g.: Source.empty[Int].lastOption() // None val s = Source.fromValues(1, 2) s.lastOption() // Some(2) s.receive() // ChannelClosed.Done The `last` operator returns the last element in `Source` or throws `NoSuchElementException` in case when it is empty. In case when `receive()` fails then `ChannelClosedException.Error` exception is thrown. It is also a terminal operation e.g.: Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last from an empty source") val s = Source.fromValues(1, 2) s.last() // 2 s.receive() // ChannelClosed.Done Note that ChannelClosedException.Error was improved to contain `cause` exception (if available).
1 parent e28a268 commit d60ce81

File tree

4 files changed

+135
-1
lines changed

4 files changed

+135
-1
lines changed

core/src/main/scala/ox/channels/ChannelClosed.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ object ChannelClosed:
88
case class Error(reason: Option[Throwable]) extends ChannelClosed
99
case object Done extends ChannelClosed
1010

11-
enum ChannelClosedException(reason: Option[Throwable]) extends Exception:
11+
enum ChannelClosedException(reason: Option[Throwable]) extends Exception(reason.orNull):
1212
case Error(reason: Option[Throwable]) extends ChannelClosedException(reason)
1313
case Done() extends ChannelClosedException(None)

core/src/main/scala/ox/channels/SourceOps.scala

+57
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,63 @@ trait SourceOps[+T] { this: Source[T] =>
565565
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
566566
case t: T @unchecked => t
567567
}
568+
569+
/** Returns the last element from this source wrapped in `Some` or `None` when the source is empty. Note that `lastOption` is a terminal
570+
* operation leaving the source in `ChannelClosed.Done` state.
571+
*
572+
* @return
573+
* A `Some(last element)` if source is not empty or None` otherwise.
574+
* @throws ChannelClosedException.Error
575+
* When `receive()` fails.
576+
* @example
577+
* {{{
578+
* import ox.*
579+
* import ox.channels.Source
580+
*
581+
* supervised {
582+
* Source.empty[Int].lastOption() // None
583+
* val s = Source.fromValues(1, 2)
584+
* s.lastOption() // Some(2)
585+
* s.receive() // ChannelClosed.Done
586+
* }
587+
* }}}
588+
*/
589+
def lastOption(): Option[T] =
590+
supervised {
591+
var value: Option[T] = None
592+
repeatUntil {
593+
receive() match
594+
case ChannelClosed.Done => true
595+
case e: ChannelClosed.Error => throw e.toThrowable
596+
case t: T @unchecked => value = Some(t); false
597+
}
598+
value
599+
}
600+
601+
/** Returns the last element from this source or throws `NoSuchElementException` when the source is empty. In case when the `receive()`
602+
* operation fails then `ChannelClosedException.Error` exception is thrown. Note that `last` is a terminal operation leaving the source
603+
* in `ChannelClosed.Done` state.
604+
*
605+
* @return
606+
* A last element if source is not empty or throws otherwise.
607+
* @throws NoSuchElementException
608+
* When source is empty.
609+
* @throws ChannelClosedException.Error
610+
* When `receive()` fails.
611+
* @example
612+
* {{{
613+
* import ox.*
614+
* import ox.channels.Source
615+
*
616+
* supervised {
617+
* Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last from an empty source")
618+
* val s = Source.fromValues(1, 2)
619+
* s.last() // 2
620+
* s.receive() // ChannelClosed.Done
621+
* }
622+
* }}}
623+
*/
624+
def last(): T = lastOption().getOrElse(throw new NoSuchElementException("cannot obtain last from an empty source"))
568625
}
569626

570627
trait SourceCompanionOps:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package ox.channels
2+
3+
import org.scalatest.OptionValues
4+
import org.scalatest.flatspec.AnyFlatSpec
5+
import org.scalatest.matchers.should.Matchers
6+
import ox.*
7+
8+
class SourceOpsLastOptionTest extends AnyFlatSpec with Matchers with OptionValues {
9+
behavior of "SourceOps.lastOption"
10+
11+
it should "return None for the empty source" in supervised {
12+
Source.empty[Int].lastOption() shouldBe None
13+
}
14+
15+
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
16+
the[ChannelClosedException.Error] thrownBy {
17+
Source
18+
.failed(new RuntimeException("source is broken"))
19+
.lastOption()
20+
} should have message "java.lang.RuntimeException: source is broken"
21+
}
22+
23+
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
24+
the[ChannelClosedException.Error] thrownBy {
25+
Source.failedWithoutReason[Int]().lastOption()
26+
}
27+
}
28+
29+
it should "return last element wrapped in Some for the non-empty source" in supervised {
30+
Source.fromValues(1, 2).lastOption().value shouldBe 2
31+
}
32+
33+
it should "drain the source" in supervised {
34+
val s = Source.fromValues(1)
35+
s.lastOption().value shouldBe 1
36+
s.receive() shouldBe ChannelClosed.Done
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ox.channels
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import ox.*
6+
7+
class SourceOpsLastTest extends AnyFlatSpec with Matchers {
8+
behavior of "SourceOps.last"
9+
10+
it should "throw NoSuchElementException for the empty source" in supervised {
11+
the[NoSuchElementException] thrownBy {
12+
Source.empty[Int].last()
13+
} should have message "cannot obtain last from an empty source"
14+
}
15+
16+
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
17+
the[ChannelClosedException.Error] thrownBy {
18+
Source
19+
.failed(new RuntimeException("source is broken"))
20+
.last()
21+
} should have message "java.lang.RuntimeException: source is broken"
22+
}
23+
24+
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
25+
the[ChannelClosedException.Error] thrownBy {
26+
Source.failedWithoutReason[Int]().last()
27+
}
28+
}
29+
30+
it should "return last element for the non-empty source" in supervised {
31+
Source.fromValues(1, 2).last() shouldBe 2
32+
}
33+
34+
it should "drain the source" in supervised {
35+
val s = Source.fromValues(1)
36+
s.last() shouldBe 1
37+
s.receive() shouldBe ChannelClosed.Done
38+
}
39+
}

0 commit comments

Comments
 (0)