Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update jox to 0.1.0 #65

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,10 @@ what is missing. Similarly, there will be a warning in case of an unneeded, extr

### Closed channels (done / error)

If any of the channels is (or becomes) in an error state, `select` returns with that error. If all channels are done,
by default `select` returns with a `Done` as well.
If any of the channels is, or becomes, closed (in an error state / done), `select` returns with that error / done state.

However, a variant of the receive clause, namely `source.receiveOrDoneClause`, will cause a `Done` to be returned from
the select, if that source is done (instead of waiting for another clause to become satisfied).

It is possible to inspect which channel is in a closed state by using the `.isDone`, `.isError` and `.isClosed` methods
(plus detailed variants).
It is possible to inspect which channel is in a closed state by using the `.isClosedForSend` and `.isClosedForReceive`
methods (plus detailed variants).

### Default clauses

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lazy val core: Project = (project in file("core"))
.settings(
name := "core",
libraryDependencies ++= Seq(
"com.softwaremill.jox" % "core" % "0.0.7",
"com.softwaremill.jox" % "core" % "0.1.0",
scalaTest
)
)
Expand Down
81 changes: 31 additions & 50 deletions core/src/main/scala/ox/channels/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ox.channels

import com.softwaremill.jox.{
Channel as JChannel,
CloseableChannel as JCloseableChannel,
Select as JSelect,
SelectClause as JSelectClause,
Sink as JSink,
Expand Down Expand Up @@ -60,44 +59,9 @@ extension [T](v: T | ChannelClosed)

//

/** Allows querying the channel for its closed status.
*
* A channel can be closed in two ways:
*
* - using [[Sink.done]], indicating that no more elements will be sent
* - using [[Sink.error]], indicating an error
*/
trait ChannelState:
protected def delegate: JCloseableChannel

/** @return `true` if the channel is closed using [[Sink.done()]] or [[Sink.error()]]. */
def isClosed: Boolean = delegate.isClosed

/** @return `true` if the channel is closed using [[Sink.done()]]. `false` if it's not closed, or closed with an error. */
def isDone: Boolean = delegate.isDone

/** @return `true` if the channel is closed using [[Sink.error()]]. `false` if it's not closed, or is done. */
def isError: Boolean = delegate.isError != null

/** @return
* `Some`, with details on why the channel is closed (using [[Sink.done()]] or [[Sink.error()]]), or `None` if the channel is not
* closed.
*/
def isClosedDetail: Option[ChannelClosed] =
if delegate.isDone then Some(ChannelClosed.Done)
else isErrorDetail

/** @return
* `Some`, with details on the channel's error (provided using [[Sink.error()]]), or `None` if the channel is not closed or is done.
*/
def isErrorDetail: Option[ChannelClosed.Error] =
delegate.isError match
case null => None
case t => Some(ChannelClosed.Error(t))

/** A channel source, which can be used to receive values from the channel. See [[Channel]] for more details. */
trait Source[+T] extends SourceOps[T] with ChannelState:
protected override def delegate: JSource[Any] // we need to use `Any` as the java types are invariant (they use use-site variance)
trait Source[+T] extends SourceOps[T]:
protected def delegate: JSource[Any] // we need to use `Any` as the java types are invariant (they use use-site variance)

// Skipping variance checks here is fine, as the only way a `Received` instance is created is by this Source (Channel),
// so no values of super-types of T which are not the original T will ever be provided
Expand All @@ -108,25 +72,28 @@ trait Source[+T] extends SourceOps[T] with ChannelState:
case class Receive private[channels] (delegate: JSelectClause[Any]) extends SelectClause[T]:
type Result = Received

/** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel.
*
* If the source is/becomes done, [[select]] will restart with channels that are not done yet.
*/
/** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel. */
def receiveClause: Receive = Receive(delegate.receiveClause(t => Received(t.asInstanceOf[T])))

/** Create a clause which can be used in [[select]]. The clause will receive a value from the current channel.
*
* If the source is/becomes done, [[select]] will stop and return a [[ChannelClosed.Done]] value.
*/
def receiveOrDoneClause: Receive = Receive(delegate.receiveOrDoneClause(t => Received(t.asInstanceOf[T])))

/** Receive a value from the channel. To throw an exception when the channel is closed, use [[orThrow]].
*
* @return
* Either a value of type `T`, or [[ChannelClosed]], when the channel is closed.
*/
def receive(): T | ChannelClosed = ChannelClosed.fromJoxOrT(delegate.receiveSafe())

/** @return
* `true` if no more values can be received from this channel; [[Source.receive()]] will return [[ChannelClosed]]. When closed for
* receive, sending values is also not possible, [[isClosedForSend]] will return `true`.
*/
def isClosedForReceive: Boolean = delegate.isClosedForReceive

/** @return
* `Some` if no more values can be received from this channel; [[Source.receive()]] will return [[ChannelClosed]]. When closed for
* receive, sending values is also not possible, [[isClosedForSend]] will return `true`.
*/
def isClosedForReceiveDetail: Option[ChannelClosed] = Option(ChannelClosed.fromJoxOrT(delegate.closedForReceive()))

/** Various operations which allow creating [[Source]] instances.
*
* Some need to be run within a concurrency scope, such as [[supervised]].
Expand All @@ -136,8 +103,8 @@ object Source extends SourceCompanionOps
//

/** A channel sink, which can be used to send values to the channel. See [[Channel]] for more details. */
trait Sink[-T] extends ChannelState:
protected override def delegate: JSink[Any] // we need to use `Any` as the java types are invariant (they use use-site variance)
trait Sink[-T]:
protected def delegate: JSink[Any] // we need to use `Any` as the java types are invariant (they use use-site variance)

/** Holds the result of a [[sendClause]] that was selected during a call to [[select]]. */
case class Sent private[channels] () extends SelectResult[Unit]:
Expand Down Expand Up @@ -191,6 +158,20 @@ trait Sink[-T] extends ChannelState:
*/
def done(): Unit | ChannelClosed = ChannelClosed.fromJoxOrUnit(delegate.doneSafe())

/** @return
* `true` if no more values can be sent to this channel; [[Sink.send()]] will return [[ChannelClosed]]. When closed for send, receiving
* using [[Source.receive()]] might still be possible, if the channel is done, and not in an error. This can be verified using
* [[isClosedForReceive]].
*/
def isClosedForSend: Boolean = delegate.isClosedForSend

/** @return
* `Some` if no more values can be sent to this channel; [[Sink.send()]] will return [[ChannelClosed]]. When closed for send, receiving
* using [[Source.receive()]] might still be possible, if the channel is done, and not in an error. This can be verified using
* [[isClosedForReceive]].
*/
def isClosedForSendDetail: Option[ChannelClosed] = Option(ChannelClosed.fromJoxOrT(delegate.closedForSend()))

//

/** Channel is a thread-safe data structure which exposes three basic operations:
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,21 @@ trait SourceOps[+T] { outer: Source[T] =>

def merge[U >: T](other: Source[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]

def drainFrom(toDrain: Source[U]): Unit =
repeatWhile {
toDrain.receive() match
case ChannelClosed.Done => c.done(); false
case ChannelClosed.Error(r) => c.error(r); false
case t: U @unchecked => c.send(t).isValue
}

forkDaemon {
repeatWhile {
select(this, other) match
case ChannelClosed.Done => c.done(); false
case ChannelClosed.Done =>
if this.isClosedForReceive then drainFrom(other) else drainFrom(this)
false
case ChannelClosed.Error(r) => c.error(r); false
case r: U @unchecked => c.send(r).isValue
}
Expand Down
103 changes: 41 additions & 62 deletions core/src/test/scala/ox/channels/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,62 +129,57 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
val c1 = Channel[Int](capacity)
val c2 = Channel[Int](capacity)
val c3 = Channel[Int](capacity)
val c4 = Channel[Int](capacity)

// when
c1.done()
c2.error(new RuntimeException())

// then
c1.isDone shouldBe true
c2.isDone shouldBe false
c3.isDone shouldBe false

c1.isError shouldBe false
c2.isError shouldBe true
c3.isError shouldBe false

c1.isClosed shouldBe true
c2.isClosed shouldBe true
c3.isClosed shouldBe false

c1.isClosedDetail should matchPattern { case Some(_) => }
c2.isClosedDetail should matchPattern { case Some(_) => }
c3.isClosedDetail shouldBe None

c1.isErrorDetail shouldBe None
c2.isErrorDetail should matchPattern { case Some(_) => }
c3.isErrorDetail shouldBe None
}

it should "skip channels, which are done immediately" in {
val c1 = Channel[Int](capacity)
val c2 = Channel[Int](capacity)
scoped {
fork {
c1.done()
c2.send(1)
supervised {
forkDaemon {
c2.send(10)
}
Thread.sleep(100) // wait for the send to suspend

Thread.sleep(100) // let the fork progress
select(c1.receiveClause, c2.receiveClause).orThrow shouldBe c2.Received(1)
// when
c1.done() // done, and no values pending to receive
c2.done() // done, and values pending
c3.error(new RuntimeException())

// then
c1.isClosedForReceive shouldBe true
c2.isClosedForReceive shouldBe false
c3.isClosedForReceive shouldBe true
c4.isClosedForReceive shouldBe false

c1.isClosedForSend shouldBe true
c2.isClosedForSend shouldBe true
c3.isClosedForSend shouldBe true
c4.isClosedForSend shouldBe false

c1.isClosedForReceiveDetail should matchPattern { case Some(_) => }
c2.isClosedForReceiveDetail shouldBe None
c3.isClosedForReceiveDetail should matchPattern { case Some(_) => }
c4.isClosedForReceiveDetail shouldBe None

c1.isClosedForSendDetail should matchPattern { case Some(_) => }
c2.isClosedForSendDetail should matchPattern { case Some(_) => }
c3.isClosedForSendDetail should matchPattern { case Some(_) => }
c4.isClosedForSendDetail shouldBe None
}
}

it should "skip channels, which become done" in {
it should "select from a non-done channel, if a value is immediately available" in {
val c1 = Channel[Int](capacity)
val c2 = Channel[Int](capacity)
scoped {
fork {
Thread.sleep(100) // let the select block
c1.done()
c2.send(1)
c1.send(1)
c2.done()
}

select(c1.receiveClause, c2.receiveClause).orThrow shouldBe c2.Received(1)
Thread.sleep(100) // let the fork progress
select(c1.receiveClause, c2.receiveClause).orThrow shouldBe c1.Received(1)
}
}

it should "not skip channels, which are done immediately, when requested" in {
it should "select a done channel, when the channel is done immediately" in {
val c1 = Channel[Int](capacity)
val c2 = Channel[Int](capacity)
scoped {
Expand All @@ -193,11 +188,11 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
}

Thread.sleep(100) // let the fork progress
select(c1.receiveClause, c2.receiveOrDoneClause) shouldBe ChannelClosed.Done
select(c1, c2) shouldBe ChannelClosed.Done
}
}

it should "not skip channels, which become done, when requested" in {
it should "select a done channel, when the channel becomes done" in {
val c1 = Channel[Int](capacity)
val c2 = Channel[Int](capacity)
scoped {
Expand All @@ -206,7 +201,7 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
c2.done()
}

select(c1.receiveClause, c2.receiveOrDoneClause) shouldBe ChannelClosed.Done
select(c1, c2) shouldBe ChannelClosed.Done
}
}
}
Expand Down Expand Up @@ -242,16 +237,6 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
c.receive() should matchPattern { case _: ChannelClosed.Error => } // repeat
}

it should "select a receive from a channel if one is not done" in {
val c1 = Channel[Int]()
c1.done()

val c2 = Channel[Int](1)
c2.send(1)

select(c1.receiveClause, c2.receiveClause).map(_.value) shouldBe 1
}

"direct channel" should "wait until elements are transmitted" in {
val c = Channel[String](0)
val trail = ConcurrentLinkedQueue[String]()
Expand Down Expand Up @@ -322,12 +307,6 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
select(c2.receiveClause, Default(10)) shouldBe DefaultResult(10)
}

it should "use the default value if all channels are done" in {
val c1 = Channel[Int](0)
c1.done()
select(c1.receiveClause, Default(10)) shouldBe DefaultResult(10)
}

it should "not use the default value if a clause is satisfiable" in {
val c1 = Channel[Int](1)
c1.send(5)
Expand All @@ -337,10 +316,10 @@ class ChannelTest extends AnyFlatSpec with Matchers with Eventually {
select(c2.sendClause(5), Default(10)) shouldBe c2.Sent()
}

it should "not use the default value if the channel is done, and a receiveOrDone clause is used" in {
it should "not use the default value if the channel is done" in {
val c1 = Channel[Int](1)
c1.done()
select(c1.receiveOrDoneClause, Default(10)) shouldBe ChannelClosed.Done
select(c1.receiveClause, Default(10)) shouldBe ChannelClosed.Done
}

it should "use the default value once a source is done (buffered channel, stress test)" in {
Expand Down
Loading
Loading