Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Source.interleaveAll combinator #13

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add Source.interleaveAll combinator
  • Loading branch information
rucek committed Oct 5, 2023
commit 895ba962915b6e45a3f5a684dbe5ff493250baf9
118 changes: 82 additions & 36 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ox.channels
import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

trait SourceOps[+T] { this: Source[T] =>
Expand Down Expand Up @@ -171,8 +172,8 @@ trait SourceOps[+T] { this: Source[T] =>
/** Sends a given number of elements (determined byc `segmentSize`) from this source to the returned channel, then sends the same number
* of elements from the `other` source and repeats. The order of elements in both sources is preserved.
*
* If one of the sources is closed before the other, the behavior depends on the `eagerCancel` flag. When set to `true`, the other source
* is cancelled immediately, otherwise the remaining elements from the other source are sent to the returned channel.
* If one of the sources is closed before the other, the behavior depends on the `eagerCancel` flag. When set to `true`, the returned
* channel is completed immediately, otherwise the remaining elements from the other source are sent to the returned channel.
*
* Must be run within a scope, since a child fork is created which receives from both sources and sends to the resulting channel.
*
Expand Down Expand Up @@ -201,41 +202,86 @@ trait SourceOps[+T] { this: Source[T] =>
* }}}
*/
def interleave[U >: T](other: Source[U], segmentSize: Int = 1, eagerComplete: Boolean = false)(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]

forkDaemon {
var source: Source[U] = this
var counter = 0
var neitherCompleted = true

def switchSource(): Unit = {
if (source == this) source = other else source = this
counter = 0
}
interleaveAll(List(other), segmentSize, eagerComplete)

repeatWhile {
source.receive() match
case ChannelClosed.Done =>
// if one source has completed, either complete the resulting source immediately if eagerComplete is set, or:
// - continue with the other source if it hasn't completed yet, or
// - complete the resulting source if both input sources have completed
if (neitherCompleted && !eagerComplete) {
neitherCompleted = false
switchSource()
true
} else {
c.done()
false
}
case ChannelClosed.Error(r) => c.error(r); false
case value: U @unchecked =>
counter += 1
// after reaching segmentSize, only switch to the other source if it hasn't completed yet
if (counter == segmentSize && neitherCompleted) switchSource()
c.send(value).isValue
}
}
c
/** Sends a given number of elements (determined byc `segmentSize`) from this source to the returned channel, then sends the same number
* of elements from each of the `others` sources and repeats. The order of elements in all sources is preserved.
*
* If any of the sources is closed before the others, the behavior depends on the `eagerCancel` flag. When set to `true`, the returned
* channel is completed immediately, otherwise the interleaving continues with the remaining non-completed sources. Once all but one
* sources are complete, the elements of the remaining non-complete source are sent to the returned channel.
*
* Must be run within a scope, since a child fork is created which receives from both sources and sends to the resulting channel.
*
* @param others
* The sources whose elements will be interleaved with the elements of this source.
* @param segmentSize
* The number of elements sent from each source before switching to the next one. Default is 1.
* @param eagerComplete
* If `true`, the returned channel is completed as soon as any of the sources completes. If 'false`, the interleaving continues with
* the remaining non-completed sources.
* @return
* A source to which the interleaved elements from both sources would be sent.
* @example
* {{{
* scala>
* import ox.*
* import ox.channels.Source
*
* scoped {
* val s1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
* val s2 = Source.fromValues(10, 20, 30)
* val s3 = Source.fromValues(100, 200, 300, 400, 500)
* s1.interleaveAll(List(s2, s3), segmentSize = 2, eagerComplete = true).toList
* }
*
* scala> val res0: List[Int] = List(1, 2, 10, 20, 100, 200, 3, 4, 30)
* }}}
*/
def interleaveAll[U >: T](others: Seq[Source[U]], segmentSize: Int = 1, eagerComplete: Boolean = false)(using
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since none of the sources is special, maybe this shoul dbe a method on Source - Source.interlaveAll?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside is that this would require us to handle the case of an empty list - but then I think we just return a done source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point with moving it to Source. As for the empty list - we can either return a done source as you suggest, or change the signature of interleaveAll so that it enforces at least two sources (since one source would be a special case as well, where we just return the argument), e.g.

def interleaveAll[T](first: Source[T], second: Source[T], rest: Source[T]*)(segmentSize: Int, eagerComplete: Boolean)

The downside of the latter approach would be making the API inconsistent between interleave and interleaveAll. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's an inconsistency, interleave acts on two sources using the more convenient dot-notation, the ...All variant takes an arbitrary list. The first / second approach would be good, but then it enforces static structure - won't work if you have a dynamically-defined list of sources.

Ox,
StageCapacity
): Source[U] =
others match
case Nil => this
case _ =>
val c = StageCapacity.newChannel[U]

forkDaemon {
val availableSources = mutable.ArrayBuffer.from(this +: others)
var currentSourceIndex = 0
var elementsRead = 0

def completeCurrentSource(): Unit =
availableSources.remove(currentSourceIndex)
currentSourceIndex = if (currentSourceIndex == 0) availableSources.size - 1 else currentSourceIndex - 1

def switchToNextSource(): Unit =
currentSourceIndex = (currentSourceIndex + 1) % availableSources.size
elementsRead = 0

repeatWhile {
availableSources(currentSourceIndex).receive() match
case ChannelClosed.Done =>
completeCurrentSource()

if (eagerComplete || availableSources.isEmpty)
c.done()
false
else
switchToNextSource()
true
case ChannelClosed.Error(r) =>
c.error(r)
false
case value: U @unchecked =>
elementsRead += 1
// after reaching segmentSize, only switch to next source if there's any other available
if (elementsRead == segmentSize && availableSources.size > 1) switchToNextSource()
c.send(value).isValue
}
}
c

/** Invokes the given function for each received element. Blocks until the channel is done.
* @throws ChannelClosedException
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsInterleaveAllTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsInterleaveAllTest extends AnyFlatSpec with Matchers {

behavior of "Source.interleaveAll"

it should "interleave with no other sources" in scoped {
val c = Source.fromValues(1, 2, 3)

val s = c.interleaveAll(List.empty)

s.toList shouldBe List(1, 2, 3)
}

it should "interleave with other sources" in scoped {
val c1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
val c2 = Source.fromValues(10, 20, 30)
val c3 = Source.fromValues(100, 200, 300, 400, 500)

val s = c1.interleaveAll(List(c2, c3))

s.toList shouldBe List(1, 10, 100, 2, 20, 200, 3, 30, 300, 4, 400, 5, 500, 6, 7, 8)
}

it should "interleave with other sources using custom segment size" in scoped {
val c1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
val c2 = Source.fromValues(10, 20, 30)
val c3 = Source.fromValues(100, 200, 300, 400, 500)

val s = c1.interleaveAll(List(c2, c3), segmentSize = 2)

s.toList shouldBe List(1, 2, 10, 20, 100, 200, 3, 4, 30, 300, 400, 5, 6, 500, 7, 8)
}

it should "interleave with other sources using custom segment size and complete eagerly" in scoped {
val c1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
val c2 = Source.fromValues(10, 20, 30)
val c3 = Source.fromValues(100, 200, 300, 400, 500)

val s = c1.interleaveAll(List(c2, c3), segmentSize = 2, eagerComplete = true)

s.toList shouldBe List(1, 2, 10, 20, 100, 200, 3, 4, 30)
}
}