From 3204a586c127cd673f80100d150f737a710aeb04 Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 24 Jun 2024 15:58:57 +0300 Subject: [PATCH 1/5] Add support for ack, nack, and requeue methods --- README.md | 33 +++++++- src/Task/ReceivedTask.php | 40 ++++++++- src/Task/ReceivedTaskInterface.php | 5 ++ src/Task/Type.php | 17 ++++ tests/Unit/Task/ReceivedTaskTest.php | 121 +++++++++++++++++++++++++-- 5 files changed, 206 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 8b178b8..8613434 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,34 @@ var_dump($task->getId() . ' has been queued'); ### Consumer -The following code will allow you to read and process the task from the RoadRunner server. +The Consumer processes tasks from RoadRunner server and responds based on the processing outcome: + +- `ack` - is used for positive acknowledgements. +- `nack` - is used for negative acknowledgements. +- `requeue` - is used for requeuing the task. + +The behavior of the `nack` method depends on its implementation by the queue driver. It can accept an additional +parameter **redelivery**; if it is passed and set to **true**, the task will be requeued. However, not all drivers +support this functionality. If the redelivery parameter is not passed, set to **false**, or the queue driver's +implementation does not support it, the task will not be requeued. + +```php +$task->nack(message: $reason, redelivery: true); +``` + +The `requeue` method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend +the task to **the end of the queue** and add additional headers to the task. + +```php +$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception); +``` + +The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call +the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods. + +```php +$task->withDelay(10)->requeue($exception); +``` ```php waitTask()) { // Process task - $task->complete(); + $task->ack(); } catch (\Throwable $e) { - $task->fail($e, requeue: true); + $task->requeue($e); } } ``` diff --git a/src/Task/ReceivedTask.php b/src/Task/ReceivedTask.php index 2c28f4e..2eac12b 100644 --- a/src/Task/ReceivedTask.php +++ b/src/Task/ReceivedTask.php @@ -62,11 +62,17 @@ public function getQueue(): string return $this->queue; } + /** + * @deprecated Since v4.5.0, use {@see ack()} instead. + */ public function complete(): void { $this->respond(Type::SUCCESS); } + /** + * @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead. + */ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void { $data = [ @@ -82,6 +88,34 @@ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false $this->respond(Type::ERROR, $data); } + public function ack(): void + { + $this->respond(Type::ACK); + } + + public function nack(string|\Stringable|\Throwable $message, bool $redelivery = false): void + { + $this->respond(Type::NACK, [ + 'message' => (string) $message, + 'redelivery' => $redelivery, + 'delay_seconds' => $this->delay, + ]); + } + + public function requeue(string|\Stringable|\Throwable $message): void + { + $data = [ + 'message' => (string) $message, + 'delay_seconds' => $this->delay, + ]; + + if (!empty($this->headers)) { + $data['headers'] = $this->headers; + } + + $this->respond(Type::REQUEUE, $data); + } + public function isCompleted(): bool { return $this->completed !== null; @@ -89,12 +123,14 @@ public function isCompleted(): bool public function isSuccessful(): bool { - return $this->completed === Type::SUCCESS; + return $this->completed === Type::SUCCESS || $this->completed === Type::ACK; } public function isFails(): bool { - return $this->completed === Type::ERROR; + return $this->completed === Type::ERROR || + $this->completed === Type::NACK || + $this->completed === Type::REQUEUE; } /** diff --git a/src/Task/ReceivedTaskInterface.php b/src/Task/ReceivedTaskInterface.php index 5fc16cf..b21dbc0 100644 --- a/src/Task/ReceivedTaskInterface.php +++ b/src/Task/ReceivedTaskInterface.php @@ -9,6 +9,9 @@ /** * @psalm-suppress MissingImmutableAnnotation The implementation of this task is mutable. + * @method void ack() + * @method void nack(string|\Stringable|\Throwable $message, bool $redelivery = false) + * @method void requeue(string|\Stringable|\Throwable $message) */ interface ReceivedTaskInterface extends QueuedTaskInterface, @@ -19,6 +22,7 @@ interface ReceivedTaskInterface extends * Marks the current task as completed. * * @throws JobsException + * @deprecated Since v4.5.0, use {@see ack()} instead. */ public function complete(): void; @@ -26,6 +30,7 @@ public function complete(): void; * Marks the current task as failed. * * @throws JobsException + * @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead. */ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void; diff --git a/src/Task/Type.php b/src/Task/Type.php index fe58dec..c391140 100644 --- a/src/Task/Type.php +++ b/src/Task/Type.php @@ -11,11 +11,28 @@ interface Type { /** * @var TypeEnum + * @deprecated Since v4.5.0, use {@see Type::ACK} instead. */ public const SUCCESS = 0; /** * @var TypeEnum + * @deprecated Since v4.5.0, use {@see Type::NACK} or {@see Type::REQUEUE} instead. */ public const ERROR = 1; + + /** + * @var TypeEnum + */ + public const ACK = 2; + + /** + * @var TypeEnum + */ + public const NACK = 3; + + /** + * @var TypeEnum + */ + public const REQUEUE = 4; } diff --git a/tests/Unit/Task/ReceivedTaskTest.php b/tests/Unit/Task/ReceivedTaskTest.php index 3f5136e..2ae13e8 100644 --- a/tests/Unit/Task/ReceivedTaskTest.php +++ b/tests/Unit/Task/ReceivedTaskTest.php @@ -5,6 +5,7 @@ namespace Spiral\RoadRunner\Jobs\Tests\Unit\Task; use Generator; +use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Spiral\RoadRunner\Jobs\Queue\Driver; @@ -38,8 +39,6 @@ public function createTask( ); } - - public function testGetsQueue(): void { $task = $this->createTask(queue: 'broker-queue-name'); @@ -81,6 +80,30 @@ public function testComplete(): void $this->assertFalse($task->isFails()); } + public function testAck(): void + { + $task = $this->createTask(); + + $this->assertFalse($task->isCompleted()); + $this->assertFalse($task->isFails()); + $this->assertFalse($task->isSuccessful()); + + $this->worker->expects($this->once()) + ->method('respond') + ->with( + $this->callback(function (Payload $payload) { + $this->assertEquals('{"type":2,"data":[]}', $payload->body); + + return true; + }), + ); + + $task->ack(); + + $this->assertTrue($task->isCompleted()); + $this->assertTrue($task->isSuccessful()); + $this->assertFalse($task->isFails()); + } public static function provideFailData(): Generator { @@ -90,9 +113,97 @@ public static function provideFailData(): Generator yield 'headers' => ['Some error message', false, null, ['foo' => 'bar']]; } - /** - * @dataProvider provideFailData - */ + #[DataProvider('provideFailData')] + public function testNack(string $error, bool $redelivery, int|null $delay): void + { + $task = $this->createTask(); + + if ($delay !== null) { + $task = $task->withDelay($delay); + } + + $this->assertFalse($task->isCompleted()); + $this->assertFalse($task->isFails()); + $this->assertFalse($task->isSuccessful()); + + $this->worker->expects($this->once()) + ->method('respond') + ->with( + $this->callback(function (Payload $payload) use ($delay, $redelivery, $error) { + $result = [ + 'type' => Type::NACK, + 'data' => [ + 'message' => $error, + 'redelivery' => $redelivery, + 'delay_seconds' => (int) $delay, + ], + ]; + + $this->assertEquals( + \json_encode($result), + $payload->body, + ); + + return true; + }), + ); + + $task->nack(message: $error, redelivery: $redelivery); + + $this->assertTrue($task->isFails()); + $this->assertFalse($task->isSuccessful()); + $this->assertTrue($task->isCompleted()); + } + + + #[DataProvider('provideFailData')] + public function testRequeue(string $error, bool $requeue, int|null $delay, array $headers): void + { + $task = $this->createTask(); + + if ($delay !== null) { + $task = $task->withDelay($delay); + } + + foreach ($headers as $key => $value) { + $task = $task->withHeader($key, $value); + $headers[$key] = [$value]; + } + + $this->assertFalse($task->isCompleted()); + $this->assertFalse($task->isFails()); + $this->assertFalse($task->isSuccessful()); + + $this->worker->expects($this->once()) + ->method('respond') + ->with( + $this->callback(function (Payload $payload) use ($delay, $error, $headers) { + $result = [ + 'type' => Type::REQUEUE, + 'data' => [ + 'message' => $error, + 'delay_seconds' => (int) $delay, + ], + ]; + + if (!empty($headers)) { + $result['data']['headers'] = $headers; + } + + $this->assertEquals(\json_encode($result), $payload->body,); + + return true; + }), + ); + + $task->requeue(message: $error); + + $this->assertTrue($task->isFails()); + $this->assertFalse($task->isSuccessful()); + $this->assertTrue($task->isCompleted()); + } + + #[DataProvider('provideFailData')] public function testFail($error, bool $requeue, int|null $delay, array $headers): void { $task = $this->createTask(); From 941d799b65a19524f1cb6e2cb27f5d8337698431 Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 24 Jun 2024 16:12:41 +0300 Subject: [PATCH 2/5] Add phpdoc for nack method --- src/Task/ReceivedTask.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Task/ReceivedTask.php b/src/Task/ReceivedTask.php index 2eac12b..8dee609 100644 --- a/src/Task/ReceivedTask.php +++ b/src/Task/ReceivedTask.php @@ -93,6 +93,9 @@ public function ack(): void $this->respond(Type::ACK); } + /** + * The behavior of this method depends on its implementation by the queue driver. + */ public function nack(string|\Stringable|\Throwable $message, bool $redelivery = false): void { $this->respond(Type::NACK, [ From 4a15c4037756934e717a35fd6ed46bec4c473a1a Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 24 Jun 2024 16:15:58 +0300 Subject: [PATCH 3/5] Add MissingClassConstType issue handler --- psalm.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/psalm.xml b/psalm.xml index 522307f..7229fa8 100644 --- a/psalm.xml +++ b/psalm.xml @@ -13,6 +13,7 @@ + From da6ab2664ca540b5946e60c591d91a1245a62b8b Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 24 Jun 2024 16:18:35 +0300 Subject: [PATCH 4/5] Add psalm suppress --- src/Task/ReceivedTask.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Task/ReceivedTask.php b/src/Task/ReceivedTask.php index 8dee609..1d26472 100644 --- a/src/Task/ReceivedTask.php +++ b/src/Task/ReceivedTask.php @@ -67,6 +67,7 @@ public function getQueue(): string */ public function complete(): void { + /** @psalm-suppress DeprecatedConstant */ $this->respond(Type::SUCCESS); } @@ -85,6 +86,7 @@ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false $data['headers'] = $this->headers; } + /** @psalm-suppress DeprecatedConstant */ $this->respond(Type::ERROR, $data); } @@ -126,11 +128,13 @@ public function isCompleted(): bool public function isSuccessful(): bool { + /** @psalm-suppress DeprecatedConstant */ return $this->completed === Type::SUCCESS || $this->completed === Type::ACK; } public function isFails(): bool { + /** @psalm-suppress DeprecatedConstant */ return $this->completed === Type::ERROR || $this->completed === Type::NACK || $this->completed === Type::REQUEUE; From 5a4c0617c0fe8024f74640fd6f04563ca38bc5c1 Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 24 Jun 2024 16:50:12 +0300 Subject: [PATCH 5/5] Fix variable name in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8613434..0958525 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,7 @@ while ($task = $consumer->waitTask()) { try { $name = $task->getName(); // "ping" $queue = $task->getQueue(); // "local" - $driver = $queue->getDriver(); // "memory" + $driver = $task->getDriver(); // "memory" $payload = $task->getPayload(); // {"site": "https://example.com"} // Process task