Skip to content

Commit

Permalink
add EventsByTagQuery to JavaDSL PersistenceTestKitReadJournal (#1763)
Browse files Browse the repository at this point in the history
* add EventsByTagQuery to JavaDSL PersistenceTestKitReadJournal

* Update PersistenceTestKitReadJournal.scala

* basic test case

* Update EventsByTagSpec.scala

* Update EventsByTagSpec.scala
  • Loading branch information
pjfanning authored Feb 8, 2025
1 parent 26db908 commit 40af227
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import pekko.persistence.query.javadsl.{
CurrentEventsByPersistenceIdQuery,
CurrentEventsByTagQuery,
EventsByPersistenceIdQuery,
EventsByTagQuery,
ReadJournal
}
import pekko.persistence.query.typed
Expand All @@ -38,7 +39,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery {
with CurrentEventsBySliceQuery
with EventsByTagQuery {

override def eventsByPersistenceId(
persistenceId: String,
Expand All @@ -62,6 +64,9 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] =
delegate.currentEventsBySlices(entityType, minSlice, maxSlice, offset).asJava

override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
delegate.eventsByTag(tag, offset).asJava

override def sliceForPersistenceId(persistenceId: String): Int =
delegate.sliceForPersistenceId(persistenceId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.pekko.persistence.testkit.query

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
Expand All @@ -27,7 +25,9 @@ import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.javadsl.{ PersistenceTestKitReadJournal => JavaPersistenceTestKitReadJournal }
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand All @@ -36,6 +36,9 @@ import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object EventsByTagSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
Expand Down Expand Up @@ -74,8 +77,16 @@ class EventsByTagSpec

implicit val classic: pekko.actor.ActorSystem = system.classicSystem

val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private val persistenceQuery = PersistenceQuery(system)

private val queries =
persistenceQuery.readJournalFor[PersistenceTestKitReadJournal](
PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor(
classOf[JavaPersistenceTestKitReadJournal],
JavaPersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String, tags: Set[String]): ActorRef[Command] = {
val probe = createTestProbe[Done]()
Expand Down Expand Up @@ -115,6 +126,19 @@ class EventsByTagSpec
probe.expectNext("c-4")
}

"find new events (Java DSL)" in {
val ackProbe = createTestProbe[Done]()
val tag = "c-tag"
val ref = setup("c", Set(tag))
val src = queriesJava.eventsByTag(tag, NoOffset).asScala
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3")

ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)

probe.expectNext("c-4")
}

"find new events after batched setup" in {
val ackProbe = createTestProbe[Done]()
val tag = "d-tag"
Expand Down

0 comments on commit 40af227

Please sign in to comment.