From 873868c87b39461c9d87a17918a1eb881b61a593 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 23 Dec 2023 16:22:21 +0800 Subject: [PATCH] =str Add dedicated stream timeout exceptions for stream timeout operators. --- .../apache/pekko/stream/javadsl/FlowTest.java | 15 +++---- .../pekko/stream/javadsl/SourceTest.java | 18 ++++----- .../pekko/stream/StreamTimeoutException.scala | 40 +++++++++++++++++++ .../org/apache/pekko/stream/impl/Timers.scala | 15 +++---- .../pekko/stream/javadsl/BidiFlow.scala | 4 +- .../apache/pekko/stream/javadsl/Flow.scala | 16 ++++---- .../apache/pekko/stream/javadsl/Source.scala | 16 ++++---- .../apache/pekko/stream/javadsl/SubFlow.scala | 16 ++++---- .../pekko/stream/javadsl/SubSource.scala | 16 ++++---- .../pekko/stream/scaladsl/BidiFlow.scala | 2 +- .../apache/pekko/stream/scaladsl/Flow.scala | 8 ++-- 11 files changed, 100 insertions(+), 66 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index c920a8307c7..f9a8e2306ea 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1364,10 +1364,9 @@ public void mustBeAbleToUseInitialTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "A TimeoutException was expected", - TimeoutException.class, - executionException.getCause().getClass()); + TimeoutException.class.isAssignableFrom(executionException.getCause().getClass())); } @Test @@ -1381,10 +1380,9 @@ public void mustBeAbleToUseCompletionTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "A TimeoutException was expected", - TimeoutException.class, - executionException.getCause().getClass()); + TimeoutException.class.isAssignableFrom(executionException.getCause().getClass())); } @Test @@ -1398,10 +1396,9 @@ public void mustBeAbleToUseIdleTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "A TimeoutException was expected", - TimeoutException.class, - executionException.getCause().getClass()); + TimeoutException.class.isAssignableFrom(executionException.getCause().getClass())); } @Test diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index f385ecb807f..7849921b5d1 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -55,8 +55,7 @@ import static org.apache.pekko.stream.testkit.TestPublisher.ManualProbe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; @SuppressWarnings("serial") public class SourceTest extends StreamTest { @@ -1204,10 +1203,9 @@ public void mustBeAbleToUseInitialTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "The cause of ExecutionException should be TimeoutException", - TimeoutException.class, - exception.getCause().getClass()); + TimeoutException.class.isAssignableFrom(exception.getCause().getClass())); } @Test @@ -1222,10 +1220,9 @@ public void mustBeAbleToUseCompletionTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "The cause of ExecutionException should be TimeoutException", - TimeoutException.class, - exception.getCause().getClass()); + TimeoutException.class.isAssignableFrom(exception.getCause().getClass())); } @Test @@ -1240,10 +1237,9 @@ public void mustBeAbleToUseIdleTimeout() { .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); - assertEquals( + assertTrue( "The cause of ExecutionException should be TimeoutException", - TimeoutException.class, - exception.getCause().getClass()); + TimeoutException.class.isAssignableFrom(exception.getCause().getClass())); } @Test diff --git a/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala new file mode 100644 index 00000000000..9be91f9967c --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/StreamTimeoutException.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import org.apache.pekko +import pekko.annotation.DoNotInherit + +import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace + +/** + * Base class for timeout exceptions specific to Pekko Streams + * + * Not for user extension + */ +@DoNotInherit +sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace + +final class InitialTimeoutException(msg: String) extends StreamTimeoutException(msg) + +final class CompletionTimeoutException(msg: String) extends StreamTimeoutException(msg) + +final class StreamIdleTimeoutException(msg: String) extends StreamTimeoutException(msg) + +final class BackpressureTimeoutException(msg: String) extends StreamTimeoutException(msg) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala index dacb2441cc6..cdbc42828fb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala @@ -13,7 +13,7 @@ package org.apache.pekko.stream.impl -import java.util.concurrent.{ TimeUnit, TimeoutException } +import java.util.concurrent.TimeUnit import scala.concurrent.duration.{ Duration, FiniteDuration } @@ -69,12 +69,13 @@ import pekko.stream.stage._ final override protected def onTimer(key: Any): Unit = if (!initialHasPassed) - failStage(new TimeoutException(s"The first element has not yet passed through in $timeout.")) + failStage( + new InitialTimeoutException(s"The first element has not yet passed through in ${timeout.toCoarsest}.")) override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout) } - override def toString = "InitialTimeoutTimer" + override def toString = "InitialTimeout" } @@ -90,7 +91,7 @@ import pekko.stream.stage._ override def onPull(): Unit = pull(in) final override protected def onTimer(key: Any): Unit = - failStage(new TimeoutException(s"The stream has not been completed in $timeout.")) + failStage(new CompletionTimeoutException(s"The stream has not been completed in ${timeout.toCoarsest}.")) override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout) } @@ -117,7 +118,7 @@ import pekko.stream.stage._ final override protected def onTimer(key: Any): Unit = if (nextDeadline - System.nanoTime < 0) - failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}.")) override def preStart(): Unit = scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout)) @@ -150,7 +151,7 @@ import pekko.stream.stage._ final override protected def onTimer(key: Any): Unit = if (waitingDemand && (nextDeadline - System.nanoTime < 0)) - failStage(new TimeoutException(s"No demand signalled in the last $timeout.")) + failStage(new BackpressureTimeoutException(s"No demand signalled in the last ${timeout.toCoarsest}.")) override def preStart(): Unit = scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout)) @@ -179,7 +180,7 @@ import pekko.stream.stage._ final override def onTimer(key: Any): Unit = if (nextDeadline - System.nanoTime < 0) - failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}.")) override def preStart(): Unit = scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala index 2ea80b6b5eb..809dbebdcad 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/BidiFlow.scala @@ -97,7 +97,7 @@ object BidiFlow { /** * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. * * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage. * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing @@ -110,7 +110,7 @@ object BidiFlow { /** * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. * * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage. * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 0b2d60c58ec..948b2205443 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3443,7 +3443,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -3459,7 +3459,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -3475,7 +3475,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -3491,7 +3491,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -3507,7 +3507,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -3524,7 +3524,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -3541,7 +3541,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -3558,7 +3558,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 1f6e80d60d3..91cd27aa48a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3997,7 +3997,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -4013,7 +4013,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -4029,7 +4029,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -4045,7 +4045,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -4061,7 +4061,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -4078,7 +4078,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -4095,7 +4095,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -4112,7 +4112,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index a4b2042ec0b..25cf5107b1f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2071,7 +2071,7 @@ class SubFlow[In, Out, Mat]( /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2087,7 +2087,7 @@ class SubFlow[In, Out, Mat]( /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2103,7 +2103,7 @@ class SubFlow[In, Out, Mat]( /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2119,7 +2119,7 @@ class SubFlow[In, Out, Mat]( /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2135,7 +2135,7 @@ class SubFlow[In, Out, Mat]( /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2152,7 +2152,7 @@ class SubFlow[In, Out, Mat]( /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2169,7 +2169,7 @@ class SubFlow[In, Out, Mat]( /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2186,7 +2186,7 @@ class SubFlow[In, Out, Mat]( /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index e0f85c86a10..15f407f069f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2048,7 +2048,7 @@ class SubSource[Out, Mat]( /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2064,7 +2064,7 @@ class SubSource[Out, Mat]( /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2080,7 +2080,7 @@ class SubSource[Out, Mat]( /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2096,7 +2096,7 @@ class SubSource[Out, Mat]( /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2112,7 +2112,7 @@ class SubSource[Out, Mat]( /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2129,7 +2129,7 @@ class SubSource[Out, Mat]( /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2146,7 +2146,7 @@ class SubSource[Out, Mat]( /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2163,7 +2163,7 @@ class SubSource[Out, Mat]( /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala index 7bdbec9dfaa..e756f833530 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala @@ -312,7 +312,7 @@ object BidiFlow { /** * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[scala.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.StreamIdleTimeoutException]]. * * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage. * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index ec1586d8315..de4c81e493d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2461,7 +2461,7 @@ trait FlowOps[+Out, +Mat] { /** * If the first element has not passed through this operator before the provided timeout, the stream is failed - * with a [[scala.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.InitialTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2475,7 +2475,7 @@ trait FlowOps[+Out, +Mat] { /** * If the completion of the stream does not happen until the provided timeout, the stream is failed - * with a [[scala.concurrent.TimeoutException]]. + * with a [[org.apache.pekko.stream.CompletionTimeoutException]]. * * '''Emits when''' upstream emits an element * @@ -2489,7 +2489,7 @@ trait FlowOps[+Out, +Mat] { /** * If the time between two processed elements exceeds the provided timeout, the stream is failed - * with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, + * with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element @@ -2504,7 +2504,7 @@ trait FlowOps[+Out, +Mat] { /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, - * the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, + * the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element