Skip to content

Commit

Permalink
Merge pull request #659 from patchlevel/lookup
Browse files Browse the repository at this point in the history
Lookup
  • Loading branch information
DavidBadura authored Jan 14, 2025
2 parents bbd97b0 + 05ad08e commit fedc793
Show file tree
Hide file tree
Showing 24 changed files with 1,052 additions and 17 deletions.
5 changes: 5 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@
<code><![CDATA[SubscriberAccessor|null]]></code>
</DeprecatedInterface>
</file>
<file src="src/Subscription/Lookup/Lookup.php">
<ArgumentTypeCoercion>
<code><![CDATA[$event]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/Subscription/Store/DoctrineSubscriptionStore.php">
<MixedArgument>
<code><![CDATA[$context]]></code>
Expand Down
59 changes: 59 additions & 0 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,65 @@ final class DoStuffSubscriber
}
}
```
##### Lookup Resolver

Sometimes you need to query previous events to build a projection.
For this you can use the `Lookup` service.
This service only has access to the messages before the current message.
Here is an example how you can use it in a projector.

```php
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Message\Reducer;
use Patchlevel\EventSourcing\Subscription\Lookup;

#[Projector('public_profile')]
final class PublicProfileProjection
{
use SubscriberUtil;

// ... constructor

#[Subscribe(Published::class)]
public function onPublished(Lookup $lookup): void
{
$messages = $lookup
->currentAggregate() // or ->currentStream() for StreamStore
->events(
ProfileCreated::class,
ProfileNameChanged::class,
)->fetchAll();

$state = (new Reducer())
->initState([
'id' => null,
'name' => null,
])
->match([
ProfileCreated::class => static function (Message $message): array {
return [
'id' => $message->event()->id->toString(),
'name' => $message->event()->name,
];
},
ProfileNameChanged::class => static function (Message $message, array $prevState): array {
return array_merge($prevState, [
'name' => $message->event()->name,
]);
},
])
->reduce($messages);

$this->connection->insert('public_profile', $state);
}

// ... setup, teardown, ...
}
```
!!! note

More about reducers you can find [here](./message.md#reducer)

##### Aggregate Id Resolver

The aggregate id resolver resolves the aggregate id.
Expand Down
6 changes: 6 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ parameters:
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: '#^Parameter \#1 \$eventClass of method Patchlevel\\EventSourcing\\Metadata\\Event\\EventRegistry\:\:eventName\(\) expects class\-string, string given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/Lookup/Lookup.php

-
message: '#^Strict comparison using \=\=\= between DateTimeImmutable and false will always evaluate to false\.$#'
identifier: identical.alwaysFalse
Expand Down
4 changes: 2 additions & 2 deletions src/Serializer/DefaultEventSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ final class DefaultEventSerializer implements EventSerializer
{
public function __construct(
private EventRegistry $eventRegistry,
private Hydrator $hydrator,
private Encoder $encoder,
private Hydrator $hydrator = new MetadataHydrator(),
private Encoder $encoder = new JsonEncoder(),
private Upcaster|null $upcaster = null,
) {
}
Expand Down
13 changes: 13 additions & 0 deletions src/Store/Criteria/ToIndexCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class ToIndexCriterion
{
public function __construct(
public readonly int $toIndex,
) {
}
}
11 changes: 9 additions & 2 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToIndexCriterion;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use PDO;

use function array_fill;
Expand Down Expand Up @@ -155,8 +157,12 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->setParameter('archived', $criterion->archived, Types::BOOLEAN);
break;
case FromIndexCriterion::class:
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
$builder->andWhere('id > :fromIndex');
$builder->setParameter('fromIndex', $criterion->fromIndex, Types::INTEGER);
break;
case ToIndexCriterion::class:
$builder->andWhere('id < :toIndex');
$builder->setParameter('toIndex', $criterion->toIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
Expand Down Expand Up @@ -352,6 +358,7 @@ public function configureSchema(Schema $schema, Connection $connection): void
private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
IndexHeader::class,
AggregateHeader::class,
StreamStartHeader::class,
ArchivedHeader::class,
Expand Down
2 changes: 2 additions & 0 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Traversable;

/** @implements IteratorAggregate<Message> */
Expand Down Expand Up @@ -125,6 +126,7 @@ private function buildGenerator(
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

$message = Message::create($event)
->withHeader(new IndexHeader($data['id']))
->withHeader(new AggregateHeader(
$data['aggregate'],
$data['aggregate_id'],
Expand Down
18 changes: 18 additions & 0 deletions src/Store/Header/IndexHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class IndexHeader
{
/** @param positive-int $index */
public function __construct(
public readonly int $index,
) {
}
}
13 changes: 10 additions & 3 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -186,11 +188,15 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->setParameter('archived', $criterion->archived, Types::BOOLEAN);
break;
case FromIndexCriterion::class:
$builder->andWhere('id > :index');
$builder->setParameter('index', $criterion->fromIndex, Types::INTEGER);
$builder->andWhere('id > :from_index');
$builder->setParameter('from_index', $criterion->fromIndex, Types::INTEGER);
break;
case ToIndexCriterion::class:
$builder->andWhere('id < :to_index');
$builder->setParameter('to_index', $criterion->toIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->andWhere('event_name IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
Expand Down Expand Up @@ -399,6 +405,7 @@ public function configureSchema(Schema $schema, Connection $connection): void
private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
IndexHeader::class,
StreamNameHeader::class,
EventIdHeader::class,
PlayheadHeader::class,
Expand Down
2 changes: 2 additions & 0 deletions src/Store/StreamDoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -130,6 +131,7 @@ private function buildGenerator(
$event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload']));

$message = Message::create($event)
->withHeader(new IndexHeader($data['id']))
->withHeader(new StreamNameHeader($data['stream']))
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)))
->withHeader(new EventIdHeader($data['event_id']));
Expand Down
Loading

0 comments on commit fedc793

Please sign in to comment.