Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ack, nack, and requeue methods #66

Merged
merged 5 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ var_dump($task->getId() . ' has been queued');

### Consumer

The following code will allow you to read and process the task from the RoadRunner server.
The Consumer processes tasks from RoadRunner server and responds based on the processing outcome:

- `ack` - is used for positive acknowledgements.
- `nack` - is used for negative acknowledgements.
- `requeue` - is used for requeuing the task.

The behavior of the `nack` method depends on its implementation by the queue driver. It can accept an additional
parameter **redelivery**; if it is passed and set to **true**, the task will be requeued. However, not all drivers
support this functionality. If the redelivery parameter is not passed, set to **false**, or the queue driver's
implementation does not support it, the task will not be requeued.

```php
$task->nack(message: $reason, redelivery: true);
```

The `requeue` method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend
the task to **the end of the queue** and add additional headers to the task.

```php
$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);
```

The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call
the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods.
Comment on lines +118 to +119
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify language for setting delays in requeuing.

The current phrasing could be simplified for better readability and understanding.

- The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call 
- the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods.
+ The `nack` and `requeue` methods can set a **delay** for requeuing tasks by calling `withDelay` with the desired value before invocation.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call
the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods.
The `nack` and `requeue` methods can set a **delay** for requeuing tasks by calling `withDelay` with the desired value before invocation.
Tools
LanguageTool

[style] ~118-~118: The phrase ‘have the ability to’ might be wordy. Consider using “can”. (HAS_THE_ABILITY_TO)
Context: ...; ``` The nack and `requeue` methods have the ability to specify a delay for requeuing the t...


[style] ~118-~118: Consider a more expressive alternative. (DO_ACHIEVE)
Context: ... a delay for requeuing the task. To do this, call the withDelay method and ...

Markdownlint

118-118: Expected: 0 or 2; Actual: 1 (MD009, no-trailing-spaces)
Trailing spaces


```php
$task->withDelay(10)->requeue($exception);
```

```php
<?php
Expand All @@ -115,9 +142,9 @@ while ($task = $consumer->waitTask()) {

// Process task

$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->fail($e, requeue: true);
$task->requeue($e);
}
}
```
Expand Down
40 changes: 38 additions & 2 deletions src/Task/ReceivedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ public function getQueue(): string
return $this->queue;
}

/**
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void
{
$this->respond(Type::SUCCESS);
}

/**
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void
{
$data = [
Expand All @@ -82,19 +88,49 @@ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false
$this->respond(Type::ERROR, $data);
}

public function ack(): void
{
$this->respond(Type::ACK);
}

public function nack(string|\Stringable|\Throwable $message, bool $redelivery = false): void
{
$this->respond(Type::NACK, [
'message' => (string) $message,
'redelivery' => $redelivery,
'delay_seconds' => $this->delay,
]);
}

public function requeue(string|\Stringable|\Throwable $message): void
{
$data = [
'message' => (string) $message,
'delay_seconds' => $this->delay,
];

if (!empty($this->headers)) {
$data['headers'] = $this->headers;
}

$this->respond(Type::REQUEUE, $data);
}

public function isCompleted(): bool
{
return $this->completed !== null;
}

public function isSuccessful(): bool
{
return $this->completed === Type::SUCCESS;
return $this->completed === Type::SUCCESS || $this->completed === Type::ACK;
}

public function isFails(): bool
{
return $this->completed === Type::ERROR;
return $this->completed === Type::ERROR ||
$this->completed === Type::NACK ||
$this->completed === Type::REQUEUE;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/Task/ReceivedTaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

/**
* @psalm-suppress MissingImmutableAnnotation The implementation of this task is mutable.
* @method void ack()
* @method void nack(string|\Stringable|\Throwable $message, bool $redelivery = false)
* @method void requeue(string|\Stringable|\Throwable $message)
*/
interface ReceivedTaskInterface extends
QueuedTaskInterface,
Expand All @@ -19,13 +22,15 @@ interface ReceivedTaskInterface extends
* Marks the current task as completed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void;

/**
* Marks the current task as failed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void;

Expand Down
17 changes: 17 additions & 0 deletions src/Task/Type.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,28 @@ interface Type
{
/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::ACK} instead.
*/
public const SUCCESS = 0;

/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::NACK} or {@see Type::REQUEUE} instead.
*/
public const ERROR = 1;

/**
* @var TypeEnum
*/
public const ACK = 2;

/**
* @var TypeEnum
*/
public const NACK = 3;

/**
* @var TypeEnum
*/
public const REQUEUE = 4;
}
121 changes: 116 additions & 5 deletions tests/Unit/Task/ReceivedTaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Spiral\RoadRunner\Jobs\Tests\Unit\Task;

use Generator;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Spiral\RoadRunner\Jobs\Queue\Driver;
Expand Down Expand Up @@ -38,8 +39,6 @@ public function createTask(
);
}



public function testGetsQueue(): void
{
$task = $this->createTask(queue: 'broker-queue-name');
Expand Down Expand Up @@ -81,6 +80,30 @@ public function testComplete(): void
$this->assertFalse($task->isFails());
}

public function testAck(): void
{
$task = $this->createTask();

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) {
$this->assertEquals('{"type":2,"data":[]}', $payload->body);

return true;
}),
);

$task->ack();

$this->assertTrue($task->isCompleted());
$this->assertTrue($task->isSuccessful());
$this->assertFalse($task->isFails());
}

public static function provideFailData(): Generator
{
Expand All @@ -90,9 +113,97 @@ public static function provideFailData(): Generator
yield 'headers' => ['Some error message', false, null, ['foo' => 'bar']];
}

/**
* @dataProvider provideFailData
*/
#[DataProvider('provideFailData')]
public function testNack(string $error, bool $redelivery, int|null $delay): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $redelivery, $error) {
$result = [
'type' => Type::NACK,
'data' => [
'message' => $error,
'redelivery' => $redelivery,
'delay_seconds' => (int) $delay,
],
];

$this->assertEquals(
\json_encode($result),
$payload->body,
);

return true;
}),
);

$task->nack(message: $error, redelivery: $redelivery);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}


#[DataProvider('provideFailData')]
public function testRequeue(string $error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

foreach ($headers as $key => $value) {
$task = $task->withHeader($key, $value);
$headers[$key] = [$value];
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $error, $headers) {
$result = [
'type' => Type::REQUEUE,
'data' => [
'message' => $error,
'delay_seconds' => (int) $delay,
],
];

if (!empty($headers)) {
$result['data']['headers'] = $headers;
}

$this->assertEquals(\json_encode($result), $payload->body,);

return true;
}),
);

$task->requeue(message: $error);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}

#[DataProvider('provideFailData')]
public function testFail($error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();
Expand Down
Loading