Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move archive logic into store #598

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,46 @@
<code><![CDATA[addMethods]]></code>
</DeprecatedMethod>
<InternalMethod>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
Expand Down
75 changes: 16 additions & 59 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -238,35 +236,31 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
$events,
);

$this->store->transactional(function () use ($messages, $aggregate, $aggregateId, $newAggregate): void {
try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

$this->archive(...$messages);
});
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
}

$this->aggregateIsValid[$aggregate] = true;

Expand Down Expand Up @@ -368,43 +362,6 @@ private function assertValidAggregate(AggregateRoot $aggregate): void
}
}

private function archive(Message ...$messages): void
{
if (!$this->store instanceof ArchivableStore) {
return;
}

$lastMessageWithNewStreamStart = null;

foreach ($messages as $message) {
if (!$message->hasHeader(StreamStartHeader::class)) {
continue;
}

$lastMessageWithNewStreamStart = $message;
}

if ($lastMessageWithNewStreamStart === null) {
return;
}

$aggregateHeader = $lastMessageWithNewStreamStart->header(AggregateHeader::class);
$this->store->archiveMessages(
$aggregateHeader->aggregateName,
$aggregateHeader->aggregateId,
$aggregateHeader->playhead,
);

$this->logger->debug(
sprintf(
'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".',
$aggregateHeader->aggregateName,
$aggregateHeader->aggregateId,
$aggregateHeader->playhead,
),
);
}

/** @return Traversable<object> */
private function unpack(Stream $stream): Traversable
{
Expand Down
10 changes: 0 additions & 10 deletions src/Store/ArchivableStore.php

This file was deleted.

60 changes: 36 additions & 24 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator
final class DoctrineDbalStore implements Store, SubscriptionStore, DoctrineSchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
Expand Down Expand Up @@ -157,6 +157,9 @@ public function save(Message ...$messages): void

$this->connection->transactional(
function (Connection $connection) use ($messages): void {
/** @var array<string, int> $achievedUntilPlayhead */
$achievedUntilPlayhead = [];

$booleanType = Type::getType(Types::BOOLEAN);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

Expand Down Expand Up @@ -203,7 +206,14 @@ function (Connection $connection) use ($messages): void {
$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;

$parameters[] = $message->hasHeader(StreamStartHeader::class);
$streamStart = $message->hasHeader(StreamStartHeader::class);

if ($streamStart) {
$key = $aggregateHeader->aggregateName . '/' . $aggregateHeader->aggregateId;
$achievedUntilPlayhead[$key] = $aggregateHeader->playhead;
}

$parameters[] = $streamStart;
$types[$offset + 6] = $booleanType;

$parameters[] = $message->hasHeader(ArchivedHeader::class);
Expand All @@ -226,11 +236,32 @@ function (Connection $connection) use ($messages): void {
$position = 0;
}

if ($position === 0) {
return;
if ($position !== 0) {
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
}

$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
foreach ($achievedUntilPlayhead as $key => $playhead) {
[$aggregateName, $aggregateId] = explode('/', $key);

$connection->executeStatement(
sprintf(
<<<'SQL'
UPDATE %s
SET archived = true
WHERE aggregate = :aggregate
AND aggregate_id = :aggregate_id
AND playhead < :playhead
AND archived = false
SQL,
$this->config['table_name'],
),
[
'aggregate' => $aggregateName,
'aggregate_id' => $aggregateId,
'playhead' => $playhead,
],
);
}
},
);
}
Expand All @@ -245,25 +276,6 @@ public function transactional(Closure $function): void
$this->connection->transactional($function);
}

public function archiveMessages(string $aggregateName, string $aggregateId, int $untilPlayhead): void
{
$statement = $this->connection->prepare(sprintf(
'UPDATE %s
SET archived = true
WHERE aggregate = :aggregate
AND aggregate_id = :aggregate_id
AND playhead < :playhead
AND archived = false',
$this->config['table_name'],
));

$statement->bindValue('aggregate', $aggregateName);
$statement->bindValue('aggregate_id', $aggregateId);
$statement->bindValue('playhead', $untilPlayhead);

$statement->executeQuery();
}

public function configureSchema(Schema $schema, Connection $connection): void
{
if ($this->connection !== $connection) {
Expand Down
47 changes: 0 additions & 47 deletions tests/Unit/Repository/DefaultRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Patchlevel\EventSourcing\Repository\WrongAggregate;
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\ArrayStream;
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
Expand Down Expand Up @@ -75,11 +74,6 @@ public function testSaveAggregate(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -126,11 +120,6 @@ public function testUpdateAggregate(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -179,11 +168,6 @@ public function testEventBus(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch(
Argument::that(static function (Message $message) {
Expand Down Expand Up @@ -252,11 +236,6 @@ public function testDecorator(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$decorator = new class implements MessageDecorator {
public function __invoke(Message $message): Message
{
Expand Down Expand Up @@ -317,11 +296,6 @@ public function testSaveAggregateWithEmptyEventStream(): void
}),
)->shouldBeCalledOnce();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand All @@ -343,11 +317,6 @@ public function testDetachedException(): void
Argument::type(Message::class),
)->willThrow(new RuntimeException());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -404,11 +373,6 @@ public function testDuplicate(): void
Argument::type(Message::class),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -440,11 +404,6 @@ public function testOutdated(): void
}),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand All @@ -465,7 +424,6 @@ public function testOutdated(): void
public function testSaveAggregateWithSplitStream(): void
{
$store = $this->prophesize(Store::class);
$store->willImplement(ArchivableStore::class);
$store->save(
Argument::that(static function (Message $message) {
if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') {
Expand Down Expand Up @@ -501,11 +459,6 @@ public function testSaveAggregateWithSplitStream(): void
return $message->header(AggregateHeader::class)->playhead === 3;
}),
)->shouldBeCalled();
$store->archiveMessages('profile', '1', 3)->shouldBeCalledOnce();
$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Expand Down
Loading
Loading