Skip to content

Commit

Permalink
finish api
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 14, 2025
1 parent dfff399 commit 05ad08e
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 65 deletions.
12 changes: 0 additions & 12 deletions docs/pages/aggregate.md
Original file line number Diff line number Diff line change
Expand Up @@ -657,22 +657,10 @@ We currently support two patterns for this: Micro Aggregates and Child Aggregate

### Micro Aggregates

<<<<<<< HEAD
!!! warning

This feature works only with the [StreamDoctrineDbalStore](./store.md#streamdoctrinedbalstore).

=======
??? example "Experimental"

This feature is still experimental and may change in the future.
Use it with caution.

!!! warning

This feature works only with the experimental [StreamDoctrineDbalStore](./store.md#streamdoctrinedbalstore).

>>>>>>> c1041b78 (refactor api again)
Micro Aggregates are a pattern to split an aggregate into several smaller aggregates.
Each of these aggregates is saved in the same stream.
This gives the Micro Aggregates the ability to independently manage their state and trigger their events,
Expand Down
24 changes: 2 additions & 22 deletions src/Message/Reducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Patchlevel\EventSourcing\Message;

use Closure;
use RuntimeException;

/**
* @template STATE of array<array-key, mixed>
Expand All @@ -25,9 +24,6 @@ final class Reducer
/** @var (Closure(STATE): OUT)|null */
private Closure|null $finalizeHandler = null;

/** @var iterable<Message>|null */
private iterable|null $messages = null;

/**
* @param STATE $initState
*
Expand Down Expand Up @@ -97,32 +93,16 @@ public function finalize(Closure $closure): self
return $this;
}

/** @param iterable<Message> $messages */
public function messages(iterable $messages): self
{
$this->messages = $messages;

return $this;
}

/**
* @param iterable<Message>|null $messages
* @param iterable<Message> $messages
*
* @return OUT|STATE
* @psalm-return (OUT is STATE ? STATE : OUT)
*/
public function reduce(iterable|null $messages = null): array
public function reduce(iterable $messages): array
{
$state = $this->initState;

if ($messages === null) {
$messages = $this->messages;

if ($messages === null) {
throw new RuntimeException('no messages given');
}
}

foreach ($messages as $message) {
$event = $message->event();

Expand Down
9 changes: 2 additions & 7 deletions src/Serializer/DefaultEventSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ final class DefaultEventSerializer implements EventSerializer
{
public function __construct(
private EventRegistry $eventRegistry,
private Hydrator $hydrator,
private Encoder $encoder,
private Hydrator $hydrator = new MetadataHydrator(),
private Encoder $encoder = new JsonEncoder(),
private Upcaster|null $upcaster = null,
) {
}
Expand Down Expand Up @@ -53,11 +53,6 @@ public function deserialize(SerializedEvent $data, array $options = []): object
return $this->hydrator->hydrate($class, $payload);
}

public function eventRegistry(): EventRegistry
{
return $this->eventRegistry;
}

/** @param list<string> $paths */
public static function createFromPaths(
array $paths,
Expand Down
2 changes: 1 addition & 1 deletion src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
$builder->setParameter('to_index', $criterion->toIndex, Types::INTEGER);
break;
case EventsCriterion::class:
$builder->andWhere('event IN (:events)');
$builder->andWhere('event_name IN (:events)');
$builder->setParameter('events', $criterion->events, ArrayParameterType::STRING);
break;
default:
Expand Down
30 changes: 18 additions & 12 deletions src/Subscription/Lookup/Lookup.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use function array_map;
use function array_values;
use function is_array;

final class Lookup
{
Expand All @@ -32,7 +33,9 @@ public function __construct(
private readonly Message $currentMessage,
private readonly EventRegistry|null $eventRegistry = null,
) {
$this->criteria = new Criteria();
$this->criteria = new Criteria(
new ToIndexCriterion($this->currentMessage->header(IndexHeader::class)->index),
);
}

/** @param class-string|string ...$events */
Expand Down Expand Up @@ -60,14 +63,23 @@ function (string $event): string {
return $self;
}

public function stream(string|null $stream = null): self
/**
* @param string|list<string>|null $stream
*
* @return $this
*/
public function stream(string|array|null $stream): self
{
$self = clone $this;

if ($stream === null) {
$self->criteria = $self->criteria->remove(StreamCriterion::class);
} else {
$self->criteria = $self->criteria->add(new StreamCriterion($stream));
if (!is_array($stream)) {
$stream = [$stream];
}

$self->criteria = $self->criteria->add(new StreamCriterion(...$stream));
}

return $self;
Expand Down Expand Up @@ -142,19 +154,15 @@ public function backwards(): self
public function fetchAll(): Stream
{
return $this->store->load(
$this->criteria->add(
new ToIndexCriterion($this->currentMessage->header(IndexHeader::class)->index),
),
$this->criteria,
backwards: $this->backwards,
);
}

public function fetchFirst(): Message
{
$stream = $this->store->load(
$this->criteria->add(
new ToIndexCriterion($this->currentMessage->header(IndexHeader::class)->index),
),
$this->criteria,
limit: 1,
backwards: $this->backwards,
);
Expand All @@ -171,9 +179,7 @@ public function fetchFirst(): Message
public function fetchLast(): Message
{
$stream = $this->store->load(
$this->criteria->add(
new ToIndexCriterion($this->currentMessage->header(IndexHeader::class)->index),
),
$this->criteria,
limit: 1,
backwards: !$this->backwards,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(
public function onAdminPromoted(AdminPromoted $event, Lookup $lookup): void
{
$messages = $lookup
->stream('profile')
->currentStream()
->events(
ProfileCreated::class,
NameChanged::class,
Expand Down
8 changes: 5 additions & 3 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventRegistryFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -1066,9 +1067,10 @@ public function testPipeline(): void

public function testLookup(): void
{
$serializer = DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']);
$eventRegistry = (new AttributeEventRegistryFactory())->create([__DIR__ . '/Events']);
$serializer = new DefaultEventSerializer($eventRegistry);

$store = new DoctrineDbalStore(
$store = new StreamDoctrineDbalStore(
$this->connection,
$serializer,
);
Expand Down Expand Up @@ -1104,7 +1106,7 @@ public function testLookup(): void
argumentResolvers: [
new LookupResolver(
$store,
$serializer->eventRegistry(),
$eventRegistry,
),
],
);
Expand Down
97 changes: 90 additions & 7 deletions tests/Unit/Subscription/Lookup/LookupTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Lookup;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventRegistry;
Expand Down Expand Up @@ -36,14 +38,12 @@ public function testMissingIndexHeader(): void

$message = new Message($event);

$lookup = new Lookup(
$this->expectException(HeaderNotFound::class);

new Lookup(
$store->reveal(),
$message,
);

$this->expectException(HeaderNotFound::class);

$lookup->fetchAll();
}

public function testEmpty(): void
Expand Down Expand Up @@ -251,6 +251,7 @@ public function testCurrentStream(): void
{
$expectedResult = new ArrayStream([]);
$expectedCriteria = new Criteria(
new ToIndexCriterion(1),
new StreamCriterion('foo'),
);

Expand Down Expand Up @@ -280,6 +281,7 @@ public function testCurrentAggregate(): void
{
$expectedResult = new ArrayStream([]);
$expectedCriteria = new Criteria(
new ToIndexCriterion(1),
new AggregateNameCriterion('foo'),
new AggregateIdCriterion('bar'),
);
Expand All @@ -293,16 +295,97 @@ public function testCurrentAggregate(): void
};

$message = (new Message($event))
->withHeader(new StreamNameHeader('foo'))
->withHeader(new AggregateHeader(
'foo',
'bar',
1,
new DateTimeImmutable(),
))
->withHeader(new IndexHeader(1));

$lookup = new Lookup(
$store->reveal(),
$message,
);

$result = $lookup->currentStream()->fetchAll();
$result = $lookup->currentAggregate()->fetchAll();

self::assertSame($expectedResult, $result);
}

public function testFetchFirst(): void
{
$message1 = new Message(new class () {
});

$message2 = new Message(new class () {
});

$expectedResult = new ArrayStream([
$message1,
$message2,
]);

$expectedCriteria = new Criteria(
new ToIndexCriterion(1),
);

$store = $this->prophesize(Store::class);
$store->load($expectedCriteria, 1, null, false)
->willReturn($expectedResult)
->shouldBeCalledOnce();

$event = new class () {
};

$message = (new Message($event))
->withHeader(new IndexHeader(1));

$lookup = new Lookup(
$store->reveal(),
$message,
);

$result = $lookup->fetchFirst();

self::assertSame($message1, $result);
}

public function testFetchLast(): void
{
$message1 = new Message(new class () {
});

$message2 = new Message(new class () {
});

$expectedResult = new ArrayStream([
$message2,
$message1,
]);

$expectedCriteria = new Criteria(
new ToIndexCriterion(1),
);

$store = $this->prophesize(Store::class);
$store->load($expectedCriteria, 1, null, true)
->willReturn($expectedResult)
->shouldBeCalledOnce();

$event = new class () {
};

$message = (new Message($event))
->withHeader(new IndexHeader(1));

$lookup = new Lookup(
$store->reveal(),
$message,
);

$result = $lookup->fetchLast();

self::assertSame($message2, $result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventRegistry;
use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata;
use Patchlevel\EventSourcing\Store\Header\IndexHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\Lookup\Lookup;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver;
Expand Down Expand Up @@ -56,6 +57,8 @@ public function testResolve(): void

$message = (new Message($event))->withHeader(
new AggregateHeader('foo', 'bar', 1, new DateTimeImmutable()),
)->withHeader(
new IndexHeader(1),
);

$lookup = $resolver->resolve(
Expand Down

0 comments on commit 05ad08e

Please sign in to comment.