This repository has been archived by the owner on Sep 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAsyncCatchupSubscriptionTest.php
64 lines (53 loc) · 1.91 KB
/
AsyncCatchupSubscriptionTest.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?php
declare(strict_types=1);
namespace Prooph\EventStoreClient;
use Amp\Loop;
use Assert\Assert;
use Prooph\EventStore\Async\EventStoreCatchUpSubscription;
use Prooph\EventStore\Async\LiveProcessingStartedOnCatchUpSubscription;
use Prooph\EventStore\CatchUpSubscriptionSettings;
use Prooph\EventStore\EndPoint;
use Psa\EventSourcing\Projection\Async\StdoutCatchupSubscription;
use Psa\EventSourcing\Projection\Async\StdoutCatchUpSubscriptionDropped;
require 'vendor/autoload.php';
require 'config/config.php';
$options = getopt('', ['stream:', 'checkpoint:']);
if (!isset($options['stream'])) {
echo 'No --stream option set' . PHP_EOL;
exit();
} else {
$stream = $options['stream'];
}
if (!isset($options['checkpoint'])) {
$checkpoint = null;
} else {
$checkpoint = (int)$options['checkpoint'];
Assert::that($checkpoint)->greaterOrEqualThan(0);
}
echo '--------------------------------------------------------------------------------' . PHP_EOL;
echo ' NOTE: If you want to run a category stream you MUST prefix it with `$ce-`!' . PHP_EOL;
echo '--------------------------------------------------------------------------------' . PHP_EOL;
Loop::run(function () use ($stream, $checkpoint) {
$eventStore = EventStoreConnectionFactory::createFromEndPoint(
new EndPoint('127.0.0.1', 1113)
);
$eventStore->onConnected(function (): void {
echo 'Connected' . PHP_EOL;
});
$eventStore->onClosed(function (): void {
echo 'Connection closed' . PHP_EOL;
});
yield $eventStore->connectAsync();
yield $eventStore->subscribeToStreamFromAsync(
$stream,
$checkpoint,
CatchUpSubscriptionSettings::default(),
new StdoutCatchupSubscription(),
new class implements LiveProcessingStartedOnCatchUpSubscription {
public function __invoke(EventStoreCatchUpSubscription $subscription): void {
echo 'Started live processing on ' . (string)$subscription->streamId() . PHP_EOL;
}
},
new StdoutCatchUpSubscriptionDropped()
);
});