Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename outdated into detached in subscription engine #537

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,21 +390,20 @@ stateDiagram-v2
Booting --> Error
Active --> Paused
Active --> Finished
Active --> Outdated
Active --> Detached
Active --> Error
Paused --> New
Paused --> Booting
Paused --> Active
Paused --> Outdated
Paused --> Detached
Finished --> Active
Finished --> Outdated
Finished --> Detached
Error --> New
Error --> Booting
Error --> Active
Error --> Paused
Error --> [*]
Outdated --> Active
Outdated --> [*]
Detached --> Active
Detached --> [*]
```

### New
Expand Down Expand Up @@ -438,16 +437,16 @@ A subscription is finished if the subscriber has the mode `RunMode::Once`.
This means that the subscription is only run once and then set to finished if it reaches the end of the event stream.
You can also reactivate the subscription if you want so that it continues.

### Outdated
### Detached

If an active or finished subscription exists in the subscription store
that does not have a subscriber in the source code with a corresponding subscriber ID,
then this subscription is marked as outdated.
then this subscription is marked as detached.
This happens when either the subscriber has been deleted
or the subscriber ID of a subscriber has changed.
In the last case there should be a new subscription with the new subscriber ID.

An outdated subscription does not automatically become active again when the subscriber exists again.
A detached subscription does not automatically become active again when the subscriber exists again.
This happens, for example, when an old version was deployed again during a rollback.

There are two options to reactivate the subscription:
Expand Down Expand Up @@ -610,7 +609,7 @@ $subscriptionEngine->run($criteria);

### Teardown

If subscriptions are outdated, they can be cleaned up here.
If subscriptions are detached, they can be cleaned up here.
The subscription engine also tries to call the `teardown` method if available.

```php
Expand Down
14 changes: 7 additions & 7 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public function run(
$this->logger?->info('Subscription Engine: Start processing.');

$this->discoverNewSubscriptions();
$this->markOutdatedSubscriptions($criteria);
$this->markDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);

$this->findForUpdate(
Expand Down Expand Up @@ -394,13 +394,13 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): void

$this->discoverNewSubscriptions();

$this->logger?->info('Subscription Engine: Start teardown outdated subscriptions.');
$this->logger?->info('Subscription Engine: Start teardown detached subscriptions.');

$this->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::Outdated],
status: [Status::Detached],
),
function (array $subscriptions): void {
foreach ($subscriptions as $subscription) {
Expand Down Expand Up @@ -542,7 +542,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): vo
groups: $criteria->groups,
status: [
Status::Error,
Status::Outdated,
Status::Detached,
Status::Paused,
Status::Finished,
],
Expand Down Expand Up @@ -710,7 +710,7 @@ private function subscriber(string $subscriberId): SubscriberAccessor|null
return $this->subscriberRepository->get($subscriberId);
}

private function markOutdatedSubscriptions(SubscriptionEngineCriteria $criteria): void
private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria): void
{
$this->findForUpdate(
new SubscriptionCriteria(
Expand All @@ -726,12 +726,12 @@ function (array $subscriptions): void {
continue;
}

$subscription->outdated();
$subscription->detached();
$this->subscriptionStore->update($subscription);

$this->logger?->info(
sprintf(
'Subscription Engine: Subscriber for "%s" not found and has been marked as outdated.',
'Subscription Engine: Subscriber for "%s" not found and has been marked as detached.',
$subscription->id(),
),
);
Expand Down
2 changes: 1 addition & 1 deletion src/Subscription/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ enum Status: string
case Active = 'active';
case Paused = 'paused';
case Finished = 'finished';
case Outdated = 'outdated';
case Detached = 'detached';
case Error = 'error';
}
8 changes: 4 additions & 4 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ public function isFinished(): bool
return $this->status === Status::Finished;
}

public function outdated(): void
public function detached(): void
{
$this->status = Status::Outdated;
$this->status = Status::Detached;
}

public function isOutdated(): bool
public function isDetached(): bool
{
return $this->status === Status::Outdated;
return $this->status === Status::Detached;
}

public function error(Throwable|string $error): void
Expand Down
20 changes: 10 additions & 10 deletions tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ public function testRunningMarkOutdated(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
0,
),
], $subscriptionStore->updatedSubscriptions);
Expand Down Expand Up @@ -1156,7 +1156,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);

$subscriptionStore = new DummySubscriptionStore([$subscription]);
Expand Down Expand Up @@ -1194,7 +1194,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);

$subscriptionStore = new DummySubscriptionStore([$subscription]);
Expand Down Expand Up @@ -1234,7 +1234,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand All @@ -1261,7 +1261,7 @@ public function testTeardownWithoutSubscriber(): void
$subscriberId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand Down Expand Up @@ -1325,7 +1325,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1355,7 +1355,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1391,7 +1391,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand All @@ -1417,7 +1417,7 @@ public function testRemoveWithoutSubscriber(): void
$subscriberId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1514,7 +1514,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand Down
14 changes: 7 additions & 7 deletions tests/Unit/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function testCreate(): void
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testBooting(): void
Expand All @@ -44,7 +44,7 @@ public function testBooting(): void
self::assertTrue($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testActive(): void
Expand All @@ -60,7 +60,7 @@ public function testActive(): void
self::assertFalse($subscription->isBooting());
self::assertTrue($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testError(): void
Expand All @@ -78,7 +78,7 @@ public function testError(): void
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertTrue($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
self::assertEquals(
new SubscriptionError(
'test',
Expand All @@ -95,14 +95,14 @@ public function testOutdated(): void
'test',
);

$subscription->outdated();
$subscription->detached();

self::assertEquals(Status::Outdated, $subscription->status());
self::assertEquals(Status::Detached, $subscription->status());
self::assertFalse($subscription->isNew());
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertTrue($subscription->isOutdated());
self::assertTrue($subscription->isDetached());
}

public function testChangePosition(): void
Expand Down
Loading