Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement fold operator #27

Merged
merged 6 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 68 additions & 31 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ox.*
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -46,7 +45,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[String].intersperse(", ").toList // List()
* Source.fromValues("foo").intersperse(", ").toList // List(foo)
* Source.fromValues("foo", "bar").intersperse(", ").toList // List(foo, ", ", bar)
Expand All @@ -71,7 +70,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[String].intersperse("[", ", ", "]").toList // List([, ])
* Source.fromValues("foo").intersperse("[", ", ", "]").toList // List([, foo, ])
* Source.fromValues("foo", "bar").intersperse("[", ", ", "]").toList // List([, foo, ", ", bar, ])
Expand Down Expand Up @@ -210,7 +209,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
Expand All @@ -228,7 +227,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].drop(1).toList // List()
* Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3)
* Source.fromValues(1).drop(2).toList // List()
Expand Down Expand Up @@ -303,7 +302,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].zipAll(Source.empty[String], -1, "foo").toList // List()
* Source.empty[Int].zipAll(Source.fromValues("a"), -1, "foo").toList // List((-1, "a"))
* Source.fromValues(1).zipAll(Source.empty[String], -1, "foo").toList // List((1, "foo"))
Expand Down Expand Up @@ -355,7 +354,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7)
* val s2 = Source.fromValues(10, 20, 30, 40)
* s1.interleave(s2, segmentSize = 2).toList
Expand Down Expand Up @@ -433,7 +432,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s = Source.fromValues(1, 2, 3, 4, 5)
* s.mapStateful(() => 0)((sum, element) => (sum + element, sum), Some.apply)
* }
Expand Down Expand Up @@ -476,7 +475,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s = Source.fromValues(1, 2, 2, 3, 2, 4, 3, 1, 5)
* // deduplicate the values
* s.mapStatefulConcat(() => Set.empty[Int])((s, e) => (s + e, Option.unless(s.contains(e))(e)))
Expand Down Expand Up @@ -515,56 +514,58 @@ trait SourceOps[+T] { this: Source[T] =>
}
c

/** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation.
* Note that `headOption` is not an idempotent operation on source as it receives elements from it.
/** Returns the first element from this source wrapped in [[Some]] or [[None]] when this source is empty. Note that `headOption` is not an
* idempotent operation on source as it receives elements from it.
*
* @return
* A `Some(first element)` if source is not empty or None` otherwise.
* A `Some(first element)` if source is not empty or `None` otherwise.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].headOption() // None
* val s = Source.fromValues(1, 2)
* s.headOption() // Some(1)
* s.headOption() // Some(2)
* }
* }}}
*/
def headOption(): Option[T] = Try(head()).toOption
def headOption(): Option[T] =
supervised {
receive() match
case ChannelClosed.Done => None
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked => Some(t)
}

/** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails
* without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not
* an idempotent operation on source as it receives elements from it.
/** Returns the first element from this source or throws [[NoSuchElementException]] when this source is empty. In case when receiving an
* element fails with exception then [[ChannelClosedException.Error]] is thrown. Note that `head` is not an idempotent operation on
* source as it receives elements from it.
*
* @return
* A first element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty or `receive()` failed without error.
* @throws exception
* When `receive()` failed with exception then this exception is re-thrown.
* When this source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from an empty source")
* supervised {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head element from an empty source")
* val s = Source.fromValues(1, 2)
* s.head() // 1
* s.head() // 2
* }
* }}}
*/
def head(): T =
supervised {
receive() match
case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source")
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
case t: T @unchecked => t
}
def head(): T = headOption().getOrElse(throw new NoSuchElementException("cannot obtain head element from an empty source"))

/** Sends elements to the returned channel limiting the throughput to specific number of elements (evenly spaced) per time unit. Note that
* the element's `receive()` time is included in the resulting throughput. For instance having `throttle(1, 1.second)` and `receive()`
Expand Down Expand Up @@ -645,7 +646,7 @@ trait SourceOps[+T] { this: Source[T] =>
* @return
* A last element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty.
* When this source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
Expand All @@ -662,6 +663,42 @@ trait SourceOps[+T] { this: Source[T] =>
* }}}
*/
def last(): T = lastOption().getOrElse(throw new NoSuchElementException("cannot obtain last element from an empty source"))

/** Uses `zero` as the current value and applies function `f` on it and a value received from this source. The returned value is used as
* the next current value and `f` is applied again with the value received from a source. The operation is repeated until the source is
* drained.
*
* @param zero
* An initial value to be used as the first argument to function `f` call.
* @param f
* A binary function (a function that takes two arguments) that is applied to the current value and value received from a source.
* @return
* Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call
* is used as an input value to the next.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @throws exception
* When function `f` throws an `exception` then it is propagated up to the caller.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.empty[Int].fold(0)((acc, n) => acc + n) // 0
* Source.fromValues(2, 3).fold(5)((acc, n) => acc - n) // 0
* }
* }}}
*/
def fold[U](zero: U)(f: (U, T) => U): U =
var current = zero
repeatWhile {
receive() match
case ChannelClosed.Done => false
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked => current = f(current, t); true
}
current
}

trait SourceCompanionOps:
Expand Down Expand Up @@ -822,7 +859,7 @@ trait SourceCompanionOps:
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
* val s2 = Source.fromValues(10, 20, 30)
* val s3 = Source.fromValues(100, 200, 300, 400, 500)
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsFoldTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsFoldTest extends AnyFlatSpec with Matchers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Can we have another test case for a scenario where the function passed to fold throws an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

behavior of "Source.fold"

it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed[Int](new RuntimeException("source is broken"))
.fold(0)((acc, n) => acc + n)
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failedWithoutReason[Int]()
.fold(0)((acc, n) => acc + n)
}
}

it should "throw exception thrown in `f` when `f` throws" in supervised {
the[RuntimeException] thrownBy {
Source
.fromValues(1)
.fold(0)((_, _) => throw new RuntimeException("Function `f` is broken"))
} should have message "Function `f` is broken"
}

it should "return `zero` value from fold on the empty source" in supervised {
Source.empty[Int].fold(0)((acc, n) => acc + n) shouldBe 0
}

it should "return fold on non-empty source" in supervised {
Source.fromValues(1, 2).fold(0)((acc, n) => acc + n) shouldBe 3
}

it should "drain the source" in supervised {
val s = Source.fromValues(1)
s.fold(0)((acc, n) => acc + n) shouldBe 1
s.receive() shouldBe ChannelClosed.Done
}
}
16 changes: 12 additions & 4 deletions core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValue
Source.empty[Int].headOption() shouldBe None
}

it should "return None for the failed source" in supervised {
Source
.failed(new RuntimeException("source is broken"))
.headOption() shouldBe None
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.headOption()
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().headOption()
}
}

it should "return Some element for the non-empty source" in supervised {
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/ox/channels/SourceOpsHeadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ class SourceOpsHeadTest extends AnyFlatSpec with Matchers {
it should "throw NoSuchElementException for the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].head()
} should have message "cannot obtain head from an empty source"
} should have message "cannot obtain head element from an empty source"
}

it should "re-throw exception that was thrown during element retrieval" in supervised {
the[RuntimeException] thrownBy {
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.head()
} should have message "source is broken"
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw NoSuchElementException for source failed without exception" in supervised {
the[NoSuchElementException] thrownBy {
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().head()
} should have message "getting head failed"
}
}

it should "return first value from non empty source" in supervised {
Expand Down