From 31d26d12671caf99eb8d53ba596dfa5ae3b6f2bd Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Mon, 18 Mar 2024 13:17:29 +0100 Subject: [PATCH] Fix uncaught decider exception in Split with Supervision.resumingDecider --- .../apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala | 4 +--- .../apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala | 3 --- .../apache/pekko/stream/impl/fusing/StreamOfStreams.scala | 7 ++++++- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala index da91f8af0df..edc9e980dc3 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala @@ -211,8 +211,6 @@ class FlowSplitAfterSpec extends StreamSpec(""" } "resume stream when splitAfter function throws" in { - info("Supervision is not supported fully by GraphStages yet") - pending val publisherProbeProbe = TestPublisher.manualProbe[Int]() val exc = TE("test") val publisher = Source @@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec(""" upstreamSubscription.sendNext(6) substreamPuppet1.expectNext(6) substreamPuppet1.expectComplete() + upstreamSubscription.sendNext(7) val substream2 = subscriber.expectNext() val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false))) substreamPuppet2.request(10) - upstreamSubscription.sendNext(7) substreamPuppet2.expectNext(7) upstreamSubscription.sendComplete() diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala index 58c6238eac5..79a64f71f42 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala @@ -327,9 +327,6 @@ class FlowSplitWhenSpec extends StreamSpec(""" } "resume stream when splitWhen function throws" in { - info("Supervision is not supported fully by GraphStages yet") - pending - val publisherProbeProbe = TestPublisher.manualProbe[Int]() val exc = TE("test") val publisher = Source diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 30b89669d70..7b0e37e9429 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -636,7 +636,12 @@ import pekko.util.ccompat.JavaConverters._ else substreamSource.push(elem) } } catch { - case NonFatal(ex) => onUpstreamFailure(ex) + case NonFatal(ex) => + decider(ex) match { + case Supervision.Resume => pull(in) + case Supervision.Stop => onUpstreamFailure(ex) + case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart? + } } }