Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup #659

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@
<code><![CDATA[SubscriberAccessor|null]]></code>
</DeprecatedInterface>
</file>
<file src="src/Subscription/Lookup/Lookup.php">
<ArgumentTypeCoercion>
<code><![CDATA[$event]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/Subscription/Store/DoctrineSubscriptionStore.php">
<MixedArgument>
<code><![CDATA[$context]]></code>
Expand Down
59 changes: 59 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,65 @@ final class DoStuffSubscriber
}
}
```
##### Lookup Resolver

Sometimes you need to query previous events to build a projection.
For this you can use the `Lookup` service.
This service only has access to the messages before the current message.
Here is an example how you can use it in a projector.

```php
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Message\Reducer;
use Patchlevel\EventSourcing\Subscription\Lookup;

#[Projector('public_profile')]
final class PublicProfileProjection
{
use SubscriberUtil;

// ... constructor

#[Subscribe(Published::class)]
public function onPublished(Lookup $lookup): void
{
$messages = $lookup
->currentAggregate() // or ->currentStream() for StreamStore
->events(
ProfileCreated::class,
ProfileNameChanged::class,
)->fetchAll();

$state = (new Reducer())
->initState([
'id' => null,
'name' => null,
])
->match([
ProfileCreated::class => static function (Message $message): array {
return [
'id' => $message->event()->id->toString(),
'name' => $message->event()->name,
];
},
ProfileNameChanged::class => static function (Message $message, array $prevState): array {
return array_merge($prevState, [
'name' => $message->event()->name,
]);
},
])
->reduce($messages);

$this->connection->insert('public_profile', $state);
}

// ... setup, teardown, ...
}
```
!!! note

More about reducers you can find [here](./message.md#reducer)

##### Aggregate Id Resolver

The aggregate id resolver resolves the aggregate id.
Expand Down
6 changes: 6 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ parameters:
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: '#^Parameter \#1 \$eventClass of method Patchlevel\\EventSourcing\\Metadata\\Event\\EventRegistry\:\:eventName\(\) expects class\-string, string given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/Lookup/Lookup.php

-
message: '#^Strict comparison using \=\=\= between DateTimeImmutable and false will always evaluate to false\.$#'
identifier: identical.alwaysFalse
Expand Down
4 changes: 2 additions & 2 deletions src/Serializer/DefaultEventSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ final class DefaultEventSerializer implements EventSerializer
{
public function __construct(
private EventRegistry $eventRegistry,
private Hydrator $hydrator,
private Encoder $encoder,
private Hydrator $hydrator = new MetadataHydrator(),
private Encoder $encoder = new JsonEncoder(),
private Upcaster|null $upcaster = null,
) {
}
Expand Down
13 changes: 13 additions & 0 deletions src/Store/Criteria/ToIndexCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class ToIndexCriterion
{
public function __construct(
public readonly int $toIndex,
) {
}
}
11 changes: 9 additions & 2 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToIndexCriterion;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use PDO;

use function array_fill;
Expand Down Expand Up @@ -155,8 +157,12 @@
$builder->setParameter('archived', $criterion->archived, Types::BOOLEAN);
break;
case FromIndexCriterion::class:
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
$builder->andWhere('id > :fromIndex');
$builder->setParameter('fromIndex', $criterion->fromIndex, Types::INTEGER);
break;
case ToIndexCriterion::class:
$builder->andWhere('id < :toIndex');
$builder->setParameter('toIndex', $criterion->toIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
Expand Down Expand Up @@ -351,7 +357,8 @@
/** @return list<object> */
private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [

Check warning on line 360 in src/Store/DoctrineDbalStore.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "ArrayItemRemoval": --- Original +++ New @@ @@ /** @return list<object> */ private function getCustomHeaders(Message $message) : array { - $filteredHeaders = [IndexHeader::class, AggregateHeader::class, StreamStartHeader::class, ArchivedHeader::class]; + $filteredHeaders = [AggregateHeader::class, StreamStartHeader::class, ArchivedHeader::class]; return array_values(array_filter($message->headers(), static fn(object $header) => !in_array($header::class, $filteredHeaders, true))); } public function supportSubscription() : bool
IndexHeader::class,
AggregateHeader::class,
StreamStartHeader::class,
ArchivedHeader::class,
Expand Down
2 changes: 2 additions & 0 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Traversable;

/** @implements IteratorAggregate<Message> */
Expand Down Expand Up @@ -125,6 +126,7 @@ private function buildGenerator(
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

$message = Message::create($event)
->withHeader(new IndexHeader($data['id']))
->withHeader(new AggregateHeader(
$data['aggregate'],
$data['aggregate_id'],
Expand Down
18 changes: 18 additions & 0 deletions src/Store/Header/IndexHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class IndexHeader
{
/** @param positive-int $index */
public function __construct(
public readonly int $index,
) {
}
}
13 changes: 10 additions & 3 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -186,11 +188,15 @@
$builder->setParameter('archived', $criterion->archived, Types::BOOLEAN);
break;
case FromIndexCriterion::class:
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
$builder->andWhere('id > :from_index');
$builder->setParameter('from_index', $criterion->fromIndex, Types::INTEGER);
break;
case ToIndexCriterion::class:
$builder->andWhere('id < :to_index');
$builder->setParameter('to_index', $criterion->toIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->andWhere('event_name IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
Expand Down Expand Up @@ -333,7 +339,7 @@
return $streams;
}

public function remove(Criteria|null $criteria = null): void

Check failure on line 342 in src/Store/StreamDoctrineDbalStore.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

The parameter $streamName of Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore#remove() changed from string to a non-contravariant Patchlevel\EventSourcing\Store\Criteria\Criteria|null
{
$builder = $this->connection->createQueryBuilder();

Expand Down Expand Up @@ -398,7 +404,8 @@
/** @return list<object> */
private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [

Check warning on line 407 in src/Store/StreamDoctrineDbalStore.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "ArrayItemRemoval": --- Original +++ New @@ @@ /** @return list<object> */ private function getCustomHeaders(Message $message) : array { - $filteredHeaders = [IndexHeader::class, StreamNameHeader::class, EventIdHeader::class, PlayheadHeader::class, RecordedOnHeader::class, ArchivedHeader::class]; + $filteredHeaders = [StreamNameHeader::class, EventIdHeader::class, PlayheadHeader::class, RecordedOnHeader::class, ArchivedHeader::class]; return array_values(array_filter($message->headers(), static fn(object $header) => !in_array($header::class, $filteredHeaders, true))); } public function supportSubscription() : bool
IndexHeader::class,
StreamNameHeader::class,
EventIdHeader::class,
PlayheadHeader::class,
Expand Down
2 changes: 2 additions & 0 deletions src/Store/StreamDoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -130,6 +131,7 @@ private function buildGenerator(
$event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload']));

$message = Message::create($event)
->withHeader(new IndexHeader($data['id']))
->withHeader(new StreamNameHeader($data['stream']))
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)))
->withHeader(new EventIdHeader($data['event_id']));
Expand Down
Loading
Loading