Skip to content

Commit

Permalink
Merge pull request #66 from roadrunner-php/feature/ack-nack-requeue
Browse files Browse the repository at this point in the history
Add support for ack, nack, and requeue methods
  • Loading branch information
msmakouz authored Jun 24, 2024
2 parents 0ee67d7 + 5a4c061 commit ab1e4ff
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 11 deletions.
35 changes: 31 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ var_dump($task->getId() . ' has been queued');

### Consumer

The following code will allow you to read and process the task from the RoadRunner server.
The Consumer processes tasks from RoadRunner server and responds based on the processing outcome:

- `ack` - is used for positive acknowledgements.
- `nack` - is used for negative acknowledgements.
- `requeue` - is used for requeuing the task.

The behavior of the `nack` method depends on its implementation by the queue driver. It can accept an additional
parameter **redelivery**; if it is passed and set to **true**, the task will be requeued. However, not all drivers
support this functionality. If the redelivery parameter is not passed, set to **false**, or the queue driver's
implementation does not support it, the task will not be requeued.

```php
$task->nack(message: $reason, redelivery: true);
```

The `requeue` method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend
the task to **the end of the queue** and add additional headers to the task.

```php
$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);
```

The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call
the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods.

```php
$task->withDelay(10)->requeue($exception);
```

```php
<?php
Expand All @@ -110,14 +137,14 @@ while ($task = $consumer->waitTask()) {
try {
$name = $task->getName(); // "ping"
$queue = $task->getQueue(); // "local"
$driver = $queue->getDriver(); // "memory"
$driver = $task->getDriver(); // "memory"
$payload = $task->getPayload(); // {"site": "https://example.com"}
// Process task
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->fail($e, requeue: true);
$task->requeue($e);
}
}
```
Expand Down
1 change: 1 addition & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<RedundantCastGivenDocblockType errorLevel="suppress" />
<RedundantCondition errorLevel="suppress" />
<DocblockTypeContradiction errorLevel="suppress" />
<MissingClassConstType errorLevel="suppress" />
</issueHandlers>
<projectFiles>
<directory name="src" />
Expand Down
47 changes: 45 additions & 2 deletions src/Task/ReceivedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,18 @@ public function getQueue(): string
return $this->queue;
}

/**
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void
{
/** @psalm-suppress DeprecatedConstant */
$this->respond(Type::SUCCESS);
}

/**
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void
{
$data = [
Expand All @@ -79,22 +86,58 @@ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false
$data['headers'] = $this->headers;
}

/** @psalm-suppress DeprecatedConstant */
$this->respond(Type::ERROR, $data);
}

public function ack(): void
{
$this->respond(Type::ACK);
}

/**
* The behavior of this method depends on its implementation by the queue driver.
*/
public function nack(string|\Stringable|\Throwable $message, bool $redelivery = false): void
{
$this->respond(Type::NACK, [
'message' => (string) $message,
'redelivery' => $redelivery,
'delay_seconds' => $this->delay,
]);
}

public function requeue(string|\Stringable|\Throwable $message): void
{
$data = [
'message' => (string) $message,
'delay_seconds' => $this->delay,
];

if (!empty($this->headers)) {
$data['headers'] = $this->headers;
}

$this->respond(Type::REQUEUE, $data);
}

public function isCompleted(): bool
{
return $this->completed !== null;
}

public function isSuccessful(): bool
{
return $this->completed === Type::SUCCESS;
/** @psalm-suppress DeprecatedConstant */
return $this->completed === Type::SUCCESS || $this->completed === Type::ACK;
}

public function isFails(): bool
{
return $this->completed === Type::ERROR;
/** @psalm-suppress DeprecatedConstant */
return $this->completed === Type::ERROR ||
$this->completed === Type::NACK ||
$this->completed === Type::REQUEUE;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/Task/ReceivedTaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

/**
* @psalm-suppress MissingImmutableAnnotation The implementation of this task is mutable.
* @method void ack()
* @method void nack(string|\Stringable|\Throwable $message, bool $redelivery = false)
* @method void requeue(string|\Stringable|\Throwable $message)
*/
interface ReceivedTaskInterface extends
QueuedTaskInterface,
Expand All @@ -19,13 +22,15 @@ interface ReceivedTaskInterface extends
* Marks the current task as completed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void;

/**
* Marks the current task as failed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void;

Expand Down
17 changes: 17 additions & 0 deletions src/Task/Type.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,28 @@ interface Type
{
/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::ACK} instead.
*/
public const SUCCESS = 0;

/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::NACK} or {@see Type::REQUEUE} instead.
*/
public const ERROR = 1;

/**
* @var TypeEnum
*/
public const ACK = 2;

/**
* @var TypeEnum
*/
public const NACK = 3;

/**
* @var TypeEnum
*/
public const REQUEUE = 4;
}
121 changes: 116 additions & 5 deletions tests/Unit/Task/ReceivedTaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Spiral\RoadRunner\Jobs\Tests\Unit\Task;

use Generator;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Spiral\RoadRunner\Jobs\Queue\Driver;
Expand Down Expand Up @@ -38,8 +39,6 @@ public function createTask(
);
}



public function testGetsQueue(): void
{
$task = $this->createTask(queue: 'broker-queue-name');
Expand Down Expand Up @@ -81,6 +80,30 @@ public function testComplete(): void
$this->assertFalse($task->isFails());
}

public function testAck(): void
{
$task = $this->createTask();

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) {
$this->assertEquals('{"type":2,"data":[]}', $payload->body);

return true;
}),
);

$task->ack();

$this->assertTrue($task->isCompleted());
$this->assertTrue($task->isSuccessful());
$this->assertFalse($task->isFails());
}

public static function provideFailData(): Generator
{
Expand All @@ -90,9 +113,97 @@ public static function provideFailData(): Generator
yield 'headers' => ['Some error message', false, null, ['foo' => 'bar']];
}

/**
* @dataProvider provideFailData
*/
#[DataProvider('provideFailData')]
public function testNack(string $error, bool $redelivery, int|null $delay): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $redelivery, $error) {
$result = [
'type' => Type::NACK,
'data' => [
'message' => $error,
'redelivery' => $redelivery,
'delay_seconds' => (int) $delay,
],
];

$this->assertEquals(
\json_encode($result),
$payload->body,
);

return true;
}),
);

$task->nack(message: $error, redelivery: $redelivery);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}


#[DataProvider('provideFailData')]
public function testRequeue(string $error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

foreach ($headers as $key => $value) {
$task = $task->withHeader($key, $value);
$headers[$key] = [$value];
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $error, $headers) {
$result = [
'type' => Type::REQUEUE,
'data' => [
'message' => $error,
'delay_seconds' => (int) $delay,
],
];

if (!empty($headers)) {
$result['data']['headers'] = $headers;
}

$this->assertEquals(\json_encode($result), $payload->body,);

return true;
}),
);

$task->requeue(message: $error);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}

#[DataProvider('provideFailData')]
public function testFail($error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();
Expand Down

0 comments on commit ab1e4ff

Please sign in to comment.