Skip to content

Commit

Permalink
=str Add dedicated stream timeout exceptions for stream timeout opera…
Browse files Browse the repository at this point in the history
…tors.
  • Loading branch information
He-Pin committed Dec 23, 2023
1 parent 80cd63f commit 873868c
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1364,10 +1364,9 @@ public void mustBeAbleToUseInitialTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"A TimeoutException was expected",
TimeoutException.class,
executionException.getCause().getClass());
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}

@Test
Expand All @@ -1381,10 +1380,9 @@ public void mustBeAbleToUseCompletionTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"A TimeoutException was expected",
TimeoutException.class,
executionException.getCause().getClass());
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}

@Test
Expand All @@ -1398,10 +1396,9 @@ public void mustBeAbleToUseIdleTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"A TimeoutException was expected",
TimeoutException.class,
executionException.getCause().getClass());
TimeoutException.class.isAssignableFrom(executionException.getCause().getClass()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
import static org.apache.pekko.stream.testkit.TestPublisher.ManualProbe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

@SuppressWarnings("serial")
public class SourceTest extends StreamTest {
Expand Down Expand Up @@ -1204,10 +1203,9 @@ public void mustBeAbleToUseInitialTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"The cause of ExecutionException should be TimeoutException",
TimeoutException.class,
exception.getCause().getClass());
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}

@Test
Expand All @@ -1222,10 +1220,9 @@ public void mustBeAbleToUseCompletionTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"The cause of ExecutionException should be TimeoutException",
TimeoutException.class,
exception.getCause().getClass());
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}

@Test
Expand All @@ -1240,10 +1237,9 @@ public void mustBeAbleToUseIdleTimeout() {
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
assertTrue(
"The cause of ExecutionException should be TimeoutException",
TimeoutException.class,
exception.getCause().getClass());
TimeoutException.class.isAssignableFrom(exception.getCause().getClass()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

import org.apache.pekko
import pekko.annotation.DoNotInherit

import scala.concurrent.TimeoutException
import scala.util.control.NoStackTrace

/**
* Base class for timeout exceptions specific to Pekko Streams
*
* Not for user extension
*/
@DoNotInherit
sealed class StreamTimeoutException(msg: String) extends TimeoutException(msg) with NoStackTrace

final class InitialTimeoutException(msg: String) extends StreamTimeoutException(msg)

final class CompletionTimeoutException(msg: String) extends StreamTimeoutException(msg)

final class StreamIdleTimeoutException(msg: String) extends StreamTimeoutException(msg)

final class BackpressureTimeoutException(msg: String) extends StreamTimeoutException(msg)
15 changes: 8 additions & 7 deletions stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.apache.pekko.stream.impl

import java.util.concurrent.{ TimeUnit, TimeoutException }
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.{ Duration, FiniteDuration }

Expand Down Expand Up @@ -69,12 +69,13 @@ import pekko.stream.stage._

final override protected def onTimer(key: Any): Unit =
if (!initialHasPassed)
failStage(new TimeoutException(s"The first element has not yet passed through in $timeout."))
failStage(
new InitialTimeoutException(s"The first element has not yet passed through in ${timeout.toCoarsest}."))

override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout)
}

override def toString = "InitialTimeoutTimer"
override def toString = "InitialTimeout"

}

Expand All @@ -90,7 +91,7 @@ import pekko.stream.stage._
override def onPull(): Unit = pull(in)

final override protected def onTimer(key: Any): Unit =
failStage(new TimeoutException(s"The stream has not been completed in $timeout."))
failStage(new CompletionTimeoutException(s"The stream has not been completed in ${timeout.toCoarsest}."))

override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout)
}
Expand All @@ -117,7 +118,7 @@ import pekko.stream.stage._

final override protected def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))

override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
Expand Down Expand Up @@ -150,7 +151,7 @@ import pekko.stream.stage._

final override protected def onTimer(key: Any): Unit =
if (waitingDemand && (nextDeadline - System.nanoTime < 0))
failStage(new TimeoutException(s"No demand signalled in the last $timeout."))
failStage(new BackpressureTimeoutException(s"No demand signalled in the last ${timeout.toCoarsest}."))

override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
Expand Down Expand Up @@ -179,7 +180,7 @@ import pekko.stream.stage._

final override def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))

override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object BidiFlow {

/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
*
* There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing
Expand All @@ -110,7 +110,7 @@ object BidiFlow {

/**
* If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.BackpressureTimeoutException]].
*
* There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage.
* If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing
Expand Down
16 changes: 8 additions & 8 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3443,7 +3443,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -3459,7 +3459,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -3475,7 +3475,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -3491,7 +3491,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -3507,7 +3507,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -3524,7 +3524,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -3541,7 +3541,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -3558,7 +3558,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr

/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3997,7 +3997,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -4013,7 +4013,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the first element has not passed through this operator before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.InitialTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -4029,7 +4029,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -4045,7 +4045,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
* with a [[org.apache.pekko.stream.CompletionTimeoutException]].
*
* '''Emits when''' upstream emits an element
*
Expand All @@ -4061,7 +4061,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -4078,7 +4078,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* with a [[org.apache.pekko.stream.StreamIdleTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -4095,7 +4095,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand All @@ -4112,7 +4112,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[

/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* the stream is failed with a [[org.apache.pekko.stream.BackpressureTimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
Expand Down
Loading

0 comments on commit 873868c

Please sign in to comment.