Skip to content

Commit

Permalink
feat: Add Pattern timeout support
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Feb 23, 2025
1 parent 22685b9 commit 3b40f38
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
Expand All @@ -36,6 +37,7 @@
import static org.apache.pekko.pattern.Patterns.pipe;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
public class PatternsTest extends JUnitSuite {
Expand Down Expand Up @@ -485,6 +487,51 @@ public void testCSAfterDuration() throws Exception {
assertEquals(expected, actual);
}

@Test
public void testCompletedStageWithTimeout() throws Exception {
final String expected = "Hello";
final CompletionStage<String> delayedStage =
Patterns.timeout(
Duration.ofMillis(200),
system.scheduler(),
ec,
() -> CompletableFuture.completedFuture(expected));
final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}

@Test
public void testFailedCompletedStageWithTimeout() throws Exception {
final CompletionStage<String> delayedStage =
Patterns.timeout(
Duration.ofMillis(200),
system.scheduler(),
ec,
() -> {
CompletableFuture<String> f = new CompletableFuture<>();
f.completeExceptionally(new IllegalStateException("Illegal!"));
return f;
});
try {
delayedStage.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof IllegalStateException);
assertEquals("Illegal!", e.getCause().getMessage());
}
}

@Test
public void testCompletedWithTimeout() throws Exception {
final CompletionStage<String> delayedStage =
Patterns.timeout(Duration.ofMillis(200), system.scheduler(), ec, CompletableFuture::new);
try {
delayedStage.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
assertEquals("Timeout of 200 milliseconds expired", e.getCause().getMessage());
}
}

@Test
public void testGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

package org.apache.pekko.pattern

import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.{ Await, ExecutionContextExecutor, Future, Promise, TimeoutException }
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ PekkoSpec, TestLatch }
Expand Down Expand Up @@ -76,4 +74,23 @@ class PatternSpec extends PekkoSpec {
intercept[IllegalStateException] { Await.result(r, remainingOrDefault) }.getMessage should ===("Mexico")
}
}

"pattern.timeout" must {
"be completed successfully eventually" in {
val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.successful(5))
Await.result(f, remainingOrDefault) should ===(5)
}

"be completed abnormally eventually" in {
val f =
pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.failed(new IllegalStateException("ABC")))
intercept[IllegalStateException] { Await.result(f, remainingOrDefault) }.getMessage should ===("ABC")
}

"be completed with a TimeoutException if not completed within the specified time" in {
val f = pekko.pattern.timeout(100.millis, using = system.scheduler)(Future.never)
intercept[TimeoutException] { Await.result(f, remainingOrDefault) }
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

package org.apache.pekko.pattern

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutException }

import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -87,4 +86,66 @@ trait FutureTimeoutSupport {
}
p
}

/**
* Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])(
implicit ec: ExecutionContext): Future[T] = {
val future =
try value
catch {
case NonFatal(t) => Future.failed(t)
}
future.value match {
case Some(_) => future
case None => // not completed yet
val p = Promise[T]()
val timeout = using.scheduleOnce(duration) {
p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
if (future.isInstanceOf[CompletableFuture[T @unchecked]]) {
future.asInstanceOf[CompletableFuture[T]]
.toCompletableFuture
.cancel(true)
}
}
future.onComplete { result =>
timeout.cancel()
p.tryComplete(result)
}(pekko.dispatch.ExecutionContexts.parasitic)
p.future
}
}

/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])(
implicit ec: ExecutionContext): CompletionStage[T] = {
val stage: CompletionStage[T] =
try value
catch {
case NonFatal(t) => Futures.failedCompletionStage(t)
}
if (stage.toCompletableFuture.isDone) {
stage
} else {
val p = new CompletableFuture[T]
val timeout = using.scheduleOnce(duration) {
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
stage.toCompletableFuture.cancel(true)
}
stage.handle[Unit]((v: T, ex: Throwable) => {
timeout.cancel()
if (v != null) p.complete(v)
if (ex != null) p.completeExceptionally(ex)
})
p
}
}

}
12 changes: 12 additions & 0 deletions actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,18 @@ object Patterns {
value: Callable[CompletionStage[T]]): CompletionStage[T] =
afterCompletionStage(duration.asScala, scheduler)(value.call())(context)

/**
* Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[java.util.concurrent.TimeoutException]]
* if the provided value is not completed within the specified duration.
* @since 1.2.0
*/
def timeout[T](
duration: java.time.Duration,
scheduler: Scheduler,
context: ExecutionContext,
value: Callable[CompletionStage[T]]): CompletionStage[T] =
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)

/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
Expand Down

0 comments on commit 3b40f38

Please sign in to comment.