-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix uncaught decider exception in Split with Supervision.resumingDecider #1207
Fix uncaught decider exception in Split with Supervision.resumingDecider #1207
Conversation
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec(""" | |||
upstreamSubscription.sendNext(6) | |||
substreamPuppet1.expectNext(6) | |||
substreamPuppet1.expectComplete() | |||
upstreamSubscription.sendNext(7) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of #252).
Logically when you read the test, its expected that when you do expectNext
that previously an element should have been sent with sendNext
and its also the same in the equivalent groupBy
test and also splitWhen
test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this again, I may be wrong and the test is indeed valid as its written right now because its splitAfter
so its its meant to be skipping the next element due to exception being thrown, in which case we have to drop one element and then do pull(in)
???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the element which case the exception is dropped now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the element which case the exception is dropped now.
So to confirm, the current PRs implementation is correct and the test as originally written was wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the downstreamSubscription.request(100)
, I think the orign test should be right too, but its a little strange now ,that we need send the next(7)
to make the next sub source be ready.
I think we should call
private def pushSubstreamSource(): Unit = {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(SubscriptionTimer, timeout)
substreamWaitingToBePushed = false
}
now inside the SubstreamHandler#onPush
when the decision is SplitAfter
too to keep the behavior the same, as it just a dummy Source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
But its also sending a completion because of this condition if (elem == 3) throw exc else elem % 3 == 0
(i.e. 6 % 3
== 0 so the split happens right after that) which is why the test is failing with
Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
at val substream2 = subscriber.expectNext()
because there isn't going to be a expectNext
, it's expecting a completion instead.
Note that at this point, we are dealing with the normal logic of SplitWhen
. The new functionality of skipping exceptions with SupervisionDecider.resume
occurs earlier when elem == 3
so we are past that and just testing happy path logic of SplitAfter
which is not changed at all (and it shouldn't be either!).
This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with SupervisionDecider.resume
, its testing something entirely different (but I would personally leave it there now so its consistent with the other tests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
**1. val substream2 = subscriber.expectNext()
, the subscriber
is not expecting a complete
, otherwise, the call sendNext(7)
will cause issue because the origin source is already completed.
The problem here is when the stream2
be generated, I expected the behavior the same, just after the 6 % 3 == 0
, wdyt @samueleresca @mdedetrich**
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
**1.
val substream2 = subscriber.expectNext()
, thesubscriber
is not expecting acomplete
, otherwise, the callsendNext(7)
will cause issue because the origin source is already completed.The problem here is when the
stream2
be generated, I expected the behavior the same, just after the6 % 3 == 0
, wdyt @samueleresca @mdedetrich**
I am not saying that this is necessarily wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(7)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with SplitAfter
then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
And this part is completely unchanged from how the test was originally written.
Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}
in the case Supervision.Resume => pull(in)
block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in onPush
(and again thats only when elem
is 3, not 6)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks folks, I appreciate the detailed explanation. And yes, I was diverting away (not on purpose) from the original goal of the test
Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping
exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy > path logic of SplitAfter which is not changed at all (and it shouldn't be either!).
looks good to me
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
@He-Pin @Roiocam @jxnu-liguobin @pjfanning @raboof Please take a note of #1207 (review) , I might be missing something here |
d1e9a99
to
455d8c5
Compare
455d8c5
to
df3b9fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
df3b9fc
to
31d26d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, we will see what will happen with nightly build.
Thanks, ill go ahead and merge this. With it being included in |
Resolves: #1205
Turns out the error was quite simple, we weren't using a decider to caught the exception in
onPush