diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index 1f0c9d764..d496fa233 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -567,6 +567,32 @@ $subscriptionEngine = new DefaultSubscriptionEngine( $retryStrategy, ); ``` +### Catchup Subscription Engine + +If aggregates are used in the processors and new events are generated there, +then they are not part of the current subscription engine run and will only be processed during the next run or boot. +This is usually not a problem in dev or prod environment because a worker is used +and these events will be processed at some point. But in testing it is not so easy. +For this reason, we have the `CatchupSubscriptionEngine`. + +```php +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; + +/** + * @var Store $eventStore + * @var SubscriptionEngine $subscriptionStore + */ +$catchupSubscriptionEngine = new CatchupSubscriptionEngine( + $eventStore, + $subscriptionEngine, +); +``` +!!! tip + + You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately. + ## Usage The Subscription Engine has a few methods needed to use it effectively. diff --git a/src/Subscription/Engine/CatchUpSubscriptionEngine.php b/src/Subscription/Engine/CatchUpSubscriptionEngine.php new file mode 100644 index 000000000..26f387780 --- /dev/null +++ b/src/Subscription/Engine/CatchUpSubscriptionEngine.php @@ -0,0 +1,73 @@ +parent->setup($criteria, $skipBooting); + } + + public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): void + { + for ($i = 0; $i < $this->limit; $i++) { + $current = $this->store->count(); + $this->parent->boot($criteria, $limit); + if ($current === $this->store->count()) { + break; + } + } + } + + public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): void + { + for ($i = 0; $i < $this->limit; $i++) { + $current = $this->store->count(); + $this->parent->run($criteria, $limit); + if ($current === $this->store->count()) { + break; + } + } + } + + public function teardown(SubscriptionEngineCriteria|null $criteria = null): void + { + $this->parent->teardown($criteria); + } + + public function remove(SubscriptionEngineCriteria|null $criteria = null): void + { + $this->parent->remove($criteria); + } + + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void + { + $this->parent->reactivate($criteria); + } + + public function pause(SubscriptionEngineCriteria|null $criteria = null): void + { + $this->parent->pause($criteria); + } + + /** @return list */ + public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array + { + return $this->parent->subscriptions($criteria); + } +} diff --git a/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php new file mode 100644 index 000000000..fd454376f --- /dev/null +++ b/tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php @@ -0,0 +1,179 @@ +prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->setup($criteria, true)->shouldBeCalled(); + $engine->setup($criteria, true); + } + + public function testBootFinished(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42)->shouldBeCalledTimes(2); + + $parent->run($criteria, 42)->shouldBeCalledOnce(); + $engine->run($criteria, 42); + } + + public function testBootSecondTime(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42, 43, 43); + + $parent->run($criteria, 42)->shouldBeCalledTimes(2); + $engine->run($criteria, 42); + } + + public function testBootLimit(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal(), 2); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42, 43, 44); + + $parent->run($criteria, 42)->shouldBeCalledTimes(2); + $engine->run($criteria, 42); + } + + public function testRunFinished(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42)->shouldBeCalledTimes(2); + + $parent->run($criteria, 42)->shouldBeCalledOnce(); + $engine->run($criteria, 42); + } + + public function testRunSecondTime(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42, 43, 43); + + $parent->run($criteria, 42)->shouldBeCalledTimes(2); + $engine->run($criteria, 42); + } + + public function testRunLimit(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal(), 2); + $criteria = new SubscriptionEngineCriteria(); + + $store->count()->willReturn(42, 43, 44); + + $parent->run($criteria, 42)->shouldBeCalledTimes(2); + $engine->run($criteria, 42); + } + + public function testTeardown(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->teardown($criteria)->shouldBeCalled(); + $engine->teardown($criteria); + } + + public function testRemove(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->remove($criteria)->shouldBeCalled(); + $engine->remove($criteria); + } + + public function testReactivate(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->reactivate($criteria)->shouldBeCalled(); + $engine->reactivate($criteria); + } + + public function testPause(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->pause($criteria)->shouldBeCalled(); + $engine->pause($criteria); + } + + public function testSubscriptions(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + $store = $this->prophesize(Store::class); + + $engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedSubscriptions = [new Subscription('foo')]; + + $parent->subscriptions($criteria)->willReturn($expectedSubscriptions)->shouldBeCalled(); + $subscriptions = $engine->subscriptions($criteria); + + self::assertEquals($expectedSubscriptions, $subscriptions); + } +}