-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement Source.futureSource
operator
#33
Conversation
f18c916
to
31f8e48
Compare
core/src/test/scala/ox/channels/SourceOpsFutureSourceTest.scala
Outdated
Show resolved
Hide resolved
31f8e48
to
c5a2f4a
Compare
ffe22ae
to
e087ea9
Compare
e087ea9
to
8bdf162
Compare
def futureSource[T](from: Future[Source[T]])(using StageCapacity, ExecutionContext): Source[T] = | ||
val c = StageCapacity.newChannel[T] | ||
from.onComplete { | ||
case Success(source) => supervised { source.pipeTo(c) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we block in onComplete
, which probably shouldn't happen - the pipe should only complete once all elements have been transmitted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's correct, 🤔 the other option would be to somehow pass the received (from onComplete -> Success
) Source
to another fork
and process it there so that emission doesn't happen in the ExecutionContext
. Let me think and propose sth there...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think adding a source-transferring channel (with a buffer size of 1) would do the trick (plus a fork awaiting for the channel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?
Creates a source that emits elements from future source when it completes or fails otherwise. The future completion is performed on the provided `ExecutionContext` whereas elements are emitted through Ox. Note that when Future fails with `ExecutionException` then its cause is returned as source failure. Examples: Source .futureSource(Future.failed(new RuntimeException("future failed"))) .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed)) Source.futureSource(Future.successful(Source.fromValues(1, 2))).toList // List(1, 2)
8bdf162
to
6538ae0
Compare
Thanks :) |
Creates a source that emits elements from future source when it completes or fails otherwise. The future completion is performed on the provided
ExecutionContext
whereas elements are emitted throughSupervised
. Note that when Future fails withExecutionException
then its cause is returned as source failure.Examples: