From bf6d0268835bc32943e0c51e98ae26ec37493221 Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Thu, 21 Sep 2023 00:46:10 +0800 Subject: [PATCH 01/11] fix: keep typed ask deadletter same as classic --- .../scala/org/apache/pekko/actor/typed/AskSpec.scala | 12 +++++++++++- .../scala/org/apache/pekko/pattern/AskSupport.scala | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index b2355e302e7..966be9ab707 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -58,7 +58,7 @@ class AskSpec extends ScalaTestWithActorTestKit(""" } "Ask pattern" must { - "fail the future if the actor is already terminated" in { + "fail the future and publish deadletter with recipient if the actor is already terminated" in { val ref = spawn(behavior) val stopResult: Future[Unit] = ref.ask(Stop.apply) stopResult.futureValue @@ -69,6 +69,16 @@ class AskSpec extends ScalaTestWithActorTestKit(""" val result = answer.failed.futureValue result shouldBe a[TimeoutException] result.getMessage should include("had already been terminated.") + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message match { + case Foo(s, _) => s should ===("bar") + case _ => fail(s"unexpected DeadLetter: $deadLetter") + } + + val deadLettersRef = system.classicSystem.deadLetters + deadLetter.recipient shouldNot equal(deadLettersRef) + deadLetter.recipient should equal(ActorRefAdapter.toClassic(actor)) } "succeed when the actor is alive" in { diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala index 8154a4ee034..e2ac3da4fc6 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala @@ -617,7 +617,7 @@ private[pekko] final class PromiseActorRef( override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match { case Stopped | _: StoppedWithPath => - provider.deadLetters ! message + provider.deadLetters ! DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this) onComplete(message, alreadyCompleted = true) case _ => if (message == null) throw InvalidMessageException("Message is null") From b10e54566a80413d7983bd25ab5985c9b425e4ae Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Thu, 21 Sep 2023 02:11:09 +0800 Subject: [PATCH 02/11] fix: compile on scala3 --- .../test/scala/org/apache/pekko/actor/typed/AskSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index 966be9ab707..9ac70e9423f 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -59,6 +59,8 @@ class AskSpec extends ScalaTestWithActorTestKit(""" "Ask pattern" must { "fail the future and publish deadletter with recipient if the actor is already terminated" in { + import pekko.actor.typed.internal.adapter.ActorRefAdapter._ + val ref = spawn(behavior) val stopResult: Future[Unit] = ref.ask(Stop.apply) stopResult.futureValue @@ -70,6 +72,7 @@ class AskSpec extends ScalaTestWithActorTestKit(""" result shouldBe a[TimeoutException] result.getMessage should include("had already been terminated.") + val deadLetterProbe = createDeadLetterProbe() val deadLetter = deadLetterProbe.receiveMessage() deadLetter.message match { case Foo(s, _) => s should ===("bar") @@ -78,7 +81,7 @@ class AskSpec extends ScalaTestWithActorTestKit(""" val deadLettersRef = system.classicSystem.deadLetters deadLetter.recipient shouldNot equal(deadLettersRef) - deadLetter.recipient should equal(ActorRefAdapter.toClassic(actor)) + deadLetter.recipient should equal(toClassic(ref)) } "succeed when the actor is alive" in { From 895d041c3f7aa19ced383fad7bc2f9016153423d Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Thu, 21 Sep 2023 23:34:27 +0800 Subject: [PATCH 03/11] fix: a more convincing test --- .../apache/pekko/actor/typed/AskSpec.scala | 15 +- .../pekko/actor/typed/DeadLetterSpec.scala | 129 ++++++++++++++++++ 2 files changed, 130 insertions(+), 14 deletions(-) create mode 100644 actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index 9ac70e9423f..b2355e302e7 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -58,9 +58,7 @@ class AskSpec extends ScalaTestWithActorTestKit(""" } "Ask pattern" must { - "fail the future and publish deadletter with recipient if the actor is already terminated" in { - import pekko.actor.typed.internal.adapter.ActorRefAdapter._ - + "fail the future if the actor is already terminated" in { val ref = spawn(behavior) val stopResult: Future[Unit] = ref.ask(Stop.apply) stopResult.futureValue @@ -71,17 +69,6 @@ class AskSpec extends ScalaTestWithActorTestKit(""" val result = answer.failed.futureValue result shouldBe a[TimeoutException] result.getMessage should include("had already been terminated.") - - val deadLetterProbe = createDeadLetterProbe() - val deadLetter = deadLetterProbe.receiveMessage() - deadLetter.message match { - case Foo(s, _) => s should ===("bar") - case _ => fail(s"unexpected DeadLetter: $deadLetter") - } - - val deadLettersRef = system.classicSystem.deadLetters - deadLetter.recipient shouldNot equal(deadLettersRef) - deadLetter.recipient should equal(toClassic(ref)) } "succeed when the actor is alive" in { diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala new file mode 100644 index 00000000000..958d842fb34 --- /dev/null +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2017-2022 Lightbend Inc. + */ + +package org.apache.pekko.actor.typed + +import org.apache.pekko.actor.IllegalActorStateException +import org.apache.pekko.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import org.apache.pekko.actor.typed.scaladsl.AskPattern.{ schedulerFromActorSystem, Askable } +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.Behaviors._ +import org.apache.pekko.util.Timeout +import org.scalatest.wordspec.AnyWordSpecLike + +import java.util.concurrent.CountDownLatch +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future, TimeoutException } +import scala.util.{ Failure, Success } + +trait Command + +case class Multiply(a: Int, b: Int, forwardRef: ActorRef[WorkerCommand], replyTo: ActorRef[Int]) extends Command +case class ReplyResult(num: Int, replyTo: ActorRef[Int]) extends Command +case class Ignore() extends Command + +trait WorkerCommand + +case class WorkerMultiply(a: Int, b: Int, replyTo: ActorRef[WorkerResult]) extends WorkerCommand + +case class WorkerResult(num: Int) extends WorkerCommand + +class ManualTerminatedTestSetup(workerCnt: Int) { + implicit val timeout: Timeout = 10.millis + val workerLatch = new CountDownLatch(workerCnt) + + def forwardBehavior: Behavior[Command] = + setup[Command] { context => + Behaviors.receiveMessage[Command] { msg => + msg match { + case Multiply(a, b, ref, replyTo) => + // context.ask is asynchronous + context.ask[WorkerCommand, WorkerResult](ref, resultReply => WorkerMultiply(a, b, resultReply)) { + case Success(result) => ReplyResult(result.num, replyTo) + case Failure(_) => Ignore() + } + Behaviors.same + case ReplyResult(num, replyTo) => + replyTo ! num + Behaviors.same + case Ignore() => Behaviors.same + } + } + } + + def workerBehavior: Receive[WorkerCommand] = + Behaviors.receiveMessage[WorkerCommand] { msg => + msg match { + case WorkerMultiply(a, b, replyTo) => + workerLatch.await() + replyTo ! WorkerResult(a * b) + Behaviors.stopped + case _ => + throw IllegalActorStateException("worker actor should not receive other message.") + } + } +} + +class DeadLetterSpec extends ScalaTestWithActorTestKit( + """ + pekko.loglevel=DEBUG + pekko.actor.debug.event-stream = on + """) with AnyWordSpecLike with LogCapturing { + + implicit def executor: ExecutionContext = + system.executionContext + + "DeadLetterActor" must { + + "publish dead letter with recipient when context.ask terminated" in new ManualTerminatedTestSetup(workerCnt = 1) { + val deadLetterProbe = createDeadLetterProbe() + val forwardRef = spawn(forwardBehavior) + val workerRef = spawn(workerBehavior) + + // this not completed unit worker reply. + val multiplyResult: Future[Int] = forwardRef.ask(replyTo => Multiply(3, 9, workerRef, replyTo)) + // waiting for temporary ask actor terminated with timeout + val result = multiplyResult.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + // unlock worker reply + workerLatch.countDown() + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message shouldBe a[WorkerResult] + val deadLettersRef = system.classicSystem.deadLetters + // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. + deadLetter.recipient shouldNot equal(deadLettersRef) + } + + "publish dead letter with recipient when AskPattern timeout" in new ManualTerminatedTestSetup(workerCnt = 1) { + val deadLetterProbe = createDeadLetterProbe() + val workerRef = spawn(workerBehavior) + + // this not completed unit countDown. + val multiplyResult: Future[WorkerResult] = workerRef.ask(replyTo => WorkerMultiply(3, 9, replyTo)) + // waiting for temporary ask actor terminated with timeout + val result = multiplyResult.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + // unlock worker reply + workerLatch.countDown() + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message shouldBe a[WorkerResult] + val deadLettersRef = system.classicSystem.deadLetters + // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. + deadLetter.recipient shouldNot equal(deadLettersRef) + } + } +} From 42bf49bf65e420089bd86efea3ab561519d1242c Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Thu, 21 Sep 2023 23:57:51 +0800 Subject: [PATCH 04/11] chore: remove unnecessary header --- .../scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index 958d842fb34..e498dba1210 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -7,10 +7,6 @@ * This file is part of the Apache Pekko project, which was derived from Akka. */ -/* - * Copyright (C) 2017-2022 Lightbend Inc. - */ - package org.apache.pekko.actor.typed import org.apache.pekko.actor.IllegalActorStateException From 762d6a1ece7ad7a5caac7a6ed647d8aa69fc06ae Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Fri, 22 Sep 2023 00:05:27 +0800 Subject: [PATCH 05/11] chore: use raw apache license --- .../apache/pekko/actor/typed/DeadLetterSpec.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index e498dba1210..4f552f04466 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -1,12 +1,19 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * This file is part of the Apache Pekko project, which was derived from Akka. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.apache.pekko.actor.typed import org.apache.pekko.actor.IllegalActorStateException From c9543a1f796f321a5e7b39543b84333f76d5fb95 Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Fri, 22 Sep 2023 01:40:19 +0800 Subject: [PATCH 06/11] chore: grammar and sort imports --- .../pekko/actor/typed/DeadLetterSpec.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index 4f552f04466..fd8c425fb6d 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -17,28 +17,30 @@ package org.apache.pekko.actor.typed import org.apache.pekko.actor.IllegalActorStateException -import org.apache.pekko.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } -import org.apache.pekko.actor.typed.scaladsl.AskPattern.{ schedulerFromActorSystem, Askable } +import org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing +import org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import org.apache.pekko.actor.typed.scaladsl.AskPattern.Askable +import org.apache.pekko.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem import org.apache.pekko.actor.typed.scaladsl.Behaviors import org.apache.pekko.actor.typed.scaladsl.Behaviors._ import org.apache.pekko.util.Timeout import org.scalatest.wordspec.AnyWordSpecLike import java.util.concurrent.CountDownLatch +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future, TimeoutException } -import scala.util.{ Failure, Success } - -trait Command +import scala.util.Failure +import scala.util.Success +sealed trait Command case class Multiply(a: Int, b: Int, forwardRef: ActorRef[WorkerCommand], replyTo: ActorRef[Int]) extends Command case class ReplyResult(num: Int, replyTo: ActorRef[Int]) extends Command case class Ignore() extends Command -trait WorkerCommand - +sealed trait WorkerCommand case class WorkerMultiply(a: Int, b: Int, replyTo: ActorRef[WorkerResult]) extends WorkerCommand - case class WorkerResult(num: Int) extends WorkerCommand class ManualTerminatedTestSetup(workerCnt: Int) { @@ -93,7 +95,7 @@ class DeadLetterSpec extends ScalaTestWithActorTestKit( val forwardRef = spawn(forwardBehavior) val workerRef = spawn(workerBehavior) - // this not completed unit worker reply. + // this will not completed unit worker reply. val multiplyResult: Future[Int] = forwardRef.ask(replyTo => Multiply(3, 9, workerRef, replyTo)) // waiting for temporary ask actor terminated with timeout val result = multiplyResult.failed.futureValue @@ -119,7 +121,7 @@ class DeadLetterSpec extends ScalaTestWithActorTestKit( val result = multiplyResult.failed.futureValue result shouldBe a[TimeoutException] result.getMessage should startWith("Ask timed out on") - // unlock worker reply + // unlock worker replying workerLatch.countDown() val deadLetter = deadLetterProbe.receiveMessage() From 83109a4081a203c1f9fbbb5b14f2193843b6be58 Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Fri, 22 Sep 2023 01:42:45 +0800 Subject: [PATCH 07/11] chore: grammar fix --- .../scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index fd8c425fb6d..c7c279bbd46 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -101,7 +101,7 @@ class DeadLetterSpec extends ScalaTestWithActorTestKit( val result = multiplyResult.failed.futureValue result shouldBe a[TimeoutException] result.getMessage should startWith("Ask timed out on") - // unlock worker reply + // unlock worker replying workerLatch.countDown() val deadLetter = deadLetterProbe.receiveMessage() From 3ef14e9f98e98a27cc62753ee7c7757fa3b19462 Mon Sep 17 00:00:00 2001 From: JingZhangChen Date: Fri, 22 Sep 2023 02:03:35 +0800 Subject: [PATCH 08/11] chore: copyright --- .../test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index c7c279bbd46..b88364753e1 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.pekko.actor.typed import org.apache.pekko.actor.IllegalActorStateException From 581da0c7c10be95ccbfb01cd71567f0892db2836 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Sat, 23 Sep 2023 17:05:33 +0800 Subject: [PATCH 09/11] fix: immutable object domain --- .../org/apache/pekko/actor/typed/DeadLetterSpec.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala index b88364753e1..35b596ecfb7 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala @@ -44,9 +44,8 @@ sealed trait WorkerCommand case class WorkerMultiply(a: Int, b: Int, replyTo: ActorRef[WorkerResult]) extends WorkerCommand case class WorkerResult(num: Int) extends WorkerCommand -class ManualTerminatedTestSetup(workerCnt: Int) { +class ManualTerminatedTestSetup(val workerLatch: CountDownLatch) { implicit val timeout: Timeout = 10.millis - val workerLatch = new CountDownLatch(workerCnt) def forwardBehavior: Behavior[Command] = setup[Command] { context => @@ -91,7 +90,8 @@ class DeadLetterSpec extends ScalaTestWithActorTestKit( "DeadLetterActor" must { - "publish dead letter with recipient when context.ask terminated" in new ManualTerminatedTestSetup(workerCnt = 1) { + "publish dead letter with recipient when context.ask terminated" in new ManualTerminatedTestSetup( + workerLatch = new CountDownLatch(1)) { val deadLetterProbe = createDeadLetterProbe() val forwardRef = spawn(forwardBehavior) val workerRef = spawn(workerBehavior) @@ -112,7 +112,8 @@ class DeadLetterSpec extends ScalaTestWithActorTestKit( deadLetter.recipient shouldNot equal(deadLettersRef) } - "publish dead letter with recipient when AskPattern timeout" in new ManualTerminatedTestSetup(workerCnt = 1) { + "publish dead letter with recipient when AskPattern timeout" in new ManualTerminatedTestSetup( + workerLatch = new CountDownLatch(1)) { val deadLetterProbe = createDeadLetterProbe() val workerRef = spawn(workerBehavior) From 0ee831d2ebf5b8ae512bc412154c5a251272e2d3 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 26 Sep 2023 20:44:56 +0800 Subject: [PATCH 10/11] fix: make it simple --- .../apache/pekko/actor/typed/AskSpec.scala | 68 +++++++++ .../pekko/actor/typed/DeadLetterSpec.scala | 136 ------------------ 2 files changed, 68 insertions(+), 136 deletions(-) delete mode 100644 actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index b2355e302e7..5dc0dd7697e 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -32,10 +32,16 @@ import pekko.pattern.StatusReply import pekko.testkit.TestException import pekko.util.Timeout +import scala.util.Failure + object AskSpec { sealed trait Msg final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg + final case class Bar(s: String, duration: FiniteDuration, replyTo: ActorRef[String]) extends Msg final case class Stop(replyTo: ActorRef[Unit]) extends Msg + sealed trait Proxy + final case class ProxyMsg(s: String) extends Proxy + final case class ProxyReply(s: String) extends Proxy } class AskSpec extends ScalaTestWithActorTestKit(""" @@ -52,6 +58,9 @@ class AskSpec extends ScalaTestWithActorTestKit(""" case (_, foo: Foo) => foo.replyTo ! "foo" Behaviors.same + case (ctx, bar: Bar) => + ctx.scheduleOnce(bar.duration, bar.replyTo, "bar") + Behaviors.same case (_, Stop(r)) => r ! (()) Behaviors.stopped @@ -119,6 +128,65 @@ class AskSpec extends ScalaTestWithActorTestKit(""" } } + "publish dead-letter if the context.ask has completed on timeout" in { + import pekko.actor.typed.internal.adapter.ActorRefAdapter._ + implicit val timeout: Timeout = 1.millis + + val actor: ActorRef[Msg] = spawn(behavior) + val mockActor: ActorRef[Proxy] = spawn(Behaviors.receive[Proxy]((context, msg) => + msg match { + case ProxyMsg(s) => + context.ask[Msg, String](actor, Bar(s, 10.millis, _)) { + case Success(result) => ProxyReply(result) + case Failure(ex) => throw ex + } + Behaviors.same + case ProxyReply(s) => + throw new IllegalArgumentException(s"unexpected reply: $s") + })) + + mockActor ! ProxyMsg("foo") + + val deadLetterProbe = createDeadLetterProbe() + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message match { + case s: String => s should ===("bar") + case _ => fail(s"unexpected DeadLetter: $deadLetter") + } + + val deadLettersRef = system.classicSystem.deadLetters + deadLetter.recipient shouldNot equal(deadLettersRef) + deadLetter.recipient shouldNot equal(toClassic(actor)) + deadLetter.recipient shouldNot equal(toClassic(mockActor)) + } + "publish dead-letter if the AskPattern.ask has completed on timeout" in { + implicit val timeout: Timeout = 1.millis + + val deadLetterProbe = createDeadLetterProbe() + val mockProbe = createTestProbe[Msg]() + val mockBusyRef = mockProbe.ref + // this will not completed unit worker reply. + val askResult: Future[String] = mockBusyRef.ask(replyTo => Foo("foo", replyTo)) + val request = mockProbe.expectMessageType[Foo](1.seconds) + // waiting for temporary ask actor terminated with timeout + mockProbe.expectTerminated(request.replyTo) + // verify ask timeout + val result = askResult.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + // mock reply manually + request match { + case Foo(s, replyTo) => replyTo ! s + } + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message shouldBe a[String] + val deadLettersRef = system.classicSystem.deadLetters + // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. + deadLetter.recipient shouldNot equal(deadLettersRef) + } + "transform a replied org.apache.pekko.actor.Status.Failure to a failed future" in { // It's unlikely but possible that this happens, since the receiving actor would // have to accept a message with an actoref that accepts AnyRef or be doing crazy casting diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala deleted file mode 100644 index 35b596ecfb7..00000000000 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pekko.actor.typed - -import org.apache.pekko.actor.IllegalActorStateException -import org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing -import org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import org.apache.pekko.actor.typed.scaladsl.AskPattern.Askable -import org.apache.pekko.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem -import org.apache.pekko.actor.typed.scaladsl.Behaviors -import org.apache.pekko.actor.typed.scaladsl.Behaviors._ -import org.apache.pekko.util.Timeout -import org.scalatest.wordspec.AnyWordSpecLike - -import java.util.concurrent.CountDownLatch -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.TimeoutException -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success - -sealed trait Command -case class Multiply(a: Int, b: Int, forwardRef: ActorRef[WorkerCommand], replyTo: ActorRef[Int]) extends Command -case class ReplyResult(num: Int, replyTo: ActorRef[Int]) extends Command -case class Ignore() extends Command - -sealed trait WorkerCommand -case class WorkerMultiply(a: Int, b: Int, replyTo: ActorRef[WorkerResult]) extends WorkerCommand -case class WorkerResult(num: Int) extends WorkerCommand - -class ManualTerminatedTestSetup(val workerLatch: CountDownLatch) { - implicit val timeout: Timeout = 10.millis - - def forwardBehavior: Behavior[Command] = - setup[Command] { context => - Behaviors.receiveMessage[Command] { msg => - msg match { - case Multiply(a, b, ref, replyTo) => - // context.ask is asynchronous - context.ask[WorkerCommand, WorkerResult](ref, resultReply => WorkerMultiply(a, b, resultReply)) { - case Success(result) => ReplyResult(result.num, replyTo) - case Failure(_) => Ignore() - } - Behaviors.same - case ReplyResult(num, replyTo) => - replyTo ! num - Behaviors.same - case Ignore() => Behaviors.same - } - } - } - - def workerBehavior: Receive[WorkerCommand] = - Behaviors.receiveMessage[WorkerCommand] { msg => - msg match { - case WorkerMultiply(a, b, replyTo) => - workerLatch.await() - replyTo ! WorkerResult(a * b) - Behaviors.stopped - case _ => - throw IllegalActorStateException("worker actor should not receive other message.") - } - } -} - -class DeadLetterSpec extends ScalaTestWithActorTestKit( - """ - pekko.loglevel=DEBUG - pekko.actor.debug.event-stream = on - """) with AnyWordSpecLike with LogCapturing { - - implicit def executor: ExecutionContext = - system.executionContext - - "DeadLetterActor" must { - - "publish dead letter with recipient when context.ask terminated" in new ManualTerminatedTestSetup( - workerLatch = new CountDownLatch(1)) { - val deadLetterProbe = createDeadLetterProbe() - val forwardRef = spawn(forwardBehavior) - val workerRef = spawn(workerBehavior) - - // this will not completed unit worker reply. - val multiplyResult: Future[Int] = forwardRef.ask(replyTo => Multiply(3, 9, workerRef, replyTo)) - // waiting for temporary ask actor terminated with timeout - val result = multiplyResult.failed.futureValue - result shouldBe a[TimeoutException] - result.getMessage should startWith("Ask timed out on") - // unlock worker replying - workerLatch.countDown() - - val deadLetter = deadLetterProbe.receiveMessage() - deadLetter.message shouldBe a[WorkerResult] - val deadLettersRef = system.classicSystem.deadLetters - // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. - deadLetter.recipient shouldNot equal(deadLettersRef) - } - - "publish dead letter with recipient when AskPattern timeout" in new ManualTerminatedTestSetup( - workerLatch = new CountDownLatch(1)) { - val deadLetterProbe = createDeadLetterProbe() - val workerRef = spawn(workerBehavior) - - // this not completed unit countDown. - val multiplyResult: Future[WorkerResult] = workerRef.ask(replyTo => WorkerMultiply(3, 9, replyTo)) - // waiting for temporary ask actor terminated with timeout - val result = multiplyResult.failed.futureValue - result shouldBe a[TimeoutException] - result.getMessage should startWith("Ask timed out on") - // unlock worker replying - workerLatch.countDown() - - val deadLetter = deadLetterProbe.receiveMessage() - deadLetter.message shouldBe a[WorkerResult] - val deadLettersRef = system.classicSystem.deadLetters - // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. - deadLetter.recipient shouldNot equal(deadLettersRef) - } - } -} From 959ed8e11a841ea516fbcca1a0b84e0172dd5d11 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 31 Dec 2023 01:02:51 +0000 Subject: [PATCH 11/11] Update jackson-jaxrs-base, ... to 2.16.1 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 00b77583e20..f6a377e1031 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -33,7 +33,7 @@ object Dependencies { val protobufJavaVersion = "3.20.3" val logbackVersion = "1.3.14" - val jacksonCoreVersion = "2.14.3" + val jacksonCoreVersion = "2.16.1" val jacksonDatabindVersion = jacksonCoreVersion val scala212Version = "2.12.18"