Skip to content

Commit

Permalink
add catchup subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 12, 2024
1 parent 07c4a52 commit a385761
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 0 deletions.
26 changes: 26 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,32 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
$retryStrategy,
);
```
### Catchup Subscription Engine

If aggregates are used in the processors and new events are generated there,
then they are not part of the current subscription engine run and will only be processed during the next run or boot.
This is usually not a problem in dev or prod environment because a worker is used
and these events will be processed at some point. But in testing it is not so easy.
For this reason, we have the `CatchupSubscriptionEngine`.

```php
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

/**
* @var Store $eventStore
* @var SubscriptionEngine $subscriptionStore
*/
$catchupSubscriptionEngine = new CatchupSubscriptionEngine(
$eventStore,
$subscriptionEngine,
);
```
!!! tip

You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately.

## Usage

The Subscription Engine has a few methods needed to use it effectively.
Expand Down
73 changes: 73 additions & 0 deletions src/Subscription/Engine/CatchUpSubscriptionEngine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Subscription;

use const PHP_INT_MAX;

final class CatchUpSubscriptionEngine implements SubscriptionEngine
{
public function __construct(
private readonly SubscriptionEngine $parent,
private readonly Store $store,
private readonly int $limit = PHP_INT_MAX,
) {
}

public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void

Check warning on line 21 in src/Subscription/Engine/CatchUpSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ public function __construct(private readonly SubscriptionEngine $parent, private readonly Store $store, private readonly int $limit = PHP_INT_MAX) { } - public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false) : void + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = true) : void { $this->parent->setup($criteria, $skipBooting); }
{
$this->parent->setup($criteria, $skipBooting);
}

public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): void
{
for ($i = 0; $i < $this->limit; $i++) {
$current = $this->store->count();
$this->parent->boot($criteria, $limit);
if ($current === $this->store->count()) {
break;
}
}
}

public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): void
{
for ($i = 0; $i < $this->limit; $i++) {

Check warning on line 39 in src/Subscription/Engine/CatchUpSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ } public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null) : void { - for ($i = 0; $i < $this->limit; $i++) { + for ($i = -1; $i < $this->limit; $i++) { $current = $this->store->count(); $this->parent->run($criteria, $limit); if ($current === $this->store->count()) {

Check warning on line 39 in src/Subscription/Engine/CatchUpSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "LessThan": --- Original +++ New @@ @@ } public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null) : void { - for ($i = 0; $i < $this->limit; $i++) { + for ($i = 0; $i <= $this->limit; $i++) { $current = $this->store->count(); $this->parent->run($criteria, $limit); if ($current === $this->store->count()) {

Check warning on line 39 in src/Subscription/Engine/CatchUpSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Increment": --- Original +++ New @@ @@ } public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null) : void { - for ($i = 0; $i < $this->limit; $i++) { + for ($i = 0; $i < $this->limit; $i--) { $current = $this->store->count(); $this->parent->run($criteria, $limit); if ($current === $this->store->count()) {
$current = $this->store->count();
$this->parent->run($criteria, $limit);
if ($current === $this->store->count()) {
break;
}
}
}

public function teardown(SubscriptionEngineCriteria|null $criteria = null): void
{
$this->parent->teardown($criteria);
}

public function remove(SubscriptionEngineCriteria|null $criteria = null): void
{
$this->parent->remove($criteria);
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void
{
$this->parent->reactivate($criteria);
}

public function pause(SubscriptionEngineCriteria|null $criteria = null): void
{
$this->parent->pause($criteria);
}

/** @return list<Subscription> */
public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array
{
return $this->parent->subscriptions($criteria);
}
}
179 changes: 179 additions & 0 deletions tests/Unit/Subscription/Engine/CatchUpSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine;

use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\Subscription;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;

/** @covers \Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine */
final class CatchUpSubscriptionEngineTest extends TestCase
{
use ProphecyTrait;

public function testSetup(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$parent->setup($criteria, true)->shouldBeCalled();
$engine->setup($criteria, true);
}

public function testBootFinished(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42)->shouldBeCalledTimes(2);

$parent->run($criteria, 42)->shouldBeCalledOnce();
$engine->run($criteria, 42);
}

public function testBootSecondTime(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42, 43, 43);

$parent->run($criteria, 42)->shouldBeCalledTimes(2);
$engine->run($criteria, 42);
}

public function testBootLimit(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal(), 2);
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42, 43, 44);

$parent->run($criteria, 42)->shouldBeCalledTimes(2);
$engine->run($criteria, 42);
}

public function testRunFinished(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42)->shouldBeCalledTimes(2);

$parent->run($criteria, 42)->shouldBeCalledOnce();
$engine->run($criteria, 42);
}

public function testRunSecondTime(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42, 43, 43);

$parent->run($criteria, 42)->shouldBeCalledTimes(2);
$engine->run($criteria, 42);
}

public function testRunLimit(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal(), 2);
$criteria = new SubscriptionEngineCriteria();

$store->count()->willReturn(42, 43, 44);

$parent->run($criteria, 42)->shouldBeCalledTimes(2);
$engine->run($criteria, 42);
}

public function testTeardown(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$parent->teardown($criteria)->shouldBeCalled();
$engine->teardown($criteria);
}

public function testRemove(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$parent->remove($criteria)->shouldBeCalled();
$engine->remove($criteria);
}

public function testReactivate(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$parent->reactivate($criteria)->shouldBeCalled();
$engine->reactivate($criteria);
}

public function testPause(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$parent->pause($criteria)->shouldBeCalled();
$engine->pause($criteria);
}

public function testSubscriptions(): void
{
$parent = $this->prophesize(SubscriptionEngine::class);
$store = $this->prophesize(Store::class);

$engine = new CatchUpSubscriptionEngine($parent->reveal(), $store->reveal());
$criteria = new SubscriptionEngineCriteria();

$expectedSubscriptions = [new Subscription('foo')];

$parent->subscriptions($criteria)->willReturn($expectedSubscriptions)->shouldBeCalled();
$subscriptions = $engine->subscriptions($criteria);

self::assertEquals($expectedSubscriptions, $subscriptions);
}
}

0 comments on commit a385761

Please sign in to comment.