This package allows to use RabbitMQ queues for queued Laravel (prioritized) jobs. Fully configurable.
Installed php extension ext-amqp
is required. Installation steps can be found in Dockerfile.
For jobs delaying you also should install rabbitmq-delayed-message-exchange
plugin for RabbitMQ. Delaying is optional feature.
Important: Before using this package you should install
avto-dev/amqp-rabbit-manager
into your application. Installation steps can be found here.
Require this package with composer using the following command:
$ composer require avto-dev/amqp-rabbit-laravel-queue "^2.0"
Installed
composer
is required (how to install composer). Also you need to fix the major version of package.
You need to fix the major version of package.
After that you should modify your configuration files:
RabbitMQ queues and exchanges configuration:
<?php
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
return [
// ...
'queues' => [
'jobs' => [
'name' => env('JOBS_QUEUE_NAME', 'jobs'),
'flags' => AmqpQueue::FLAG_DURABLE, // Remain queue active when a server restarts
'arguments' => [
'x-max-priority' => 255, // @link <https://www.rabbitmq.com/priority.html>
],
'consumer_tag' => null,
],
'failed' => [
'name' => env('FAILED_JOBS_QUEUE_NAME', 'failed-jobs'),
'flags' => AmqpQueue::FLAG_DURABLE,
'arguments' => [
'x-message-ttl' => 604800000, // 7 days, @link <https://www.rabbitmq.com/ttl.html>
'x-queue-mode' => 'lazy', // @link <https://www.rabbitmq.com/lazy-queues.html>
],
'consumer_tag' => null,
],
],
// ...
'exchanges' => [
// RabbitMQ Delayed Message Plugin is required (@link: <https://git.io/fj4SE>)
'delayed-jobs' => [
'name' => env('DELAYED_JOBS_EXCHANGE_NAME', 'jobs.delayed'),
'type' => 'x-delayed-message',
'flags' => AmqpTopic::FLAG_DURABLE, // Remain active when a server restarts
'arguments' => [
'x-delayed-type' => AmqpTopic::TYPE_DIRECT,
],
],
],
// ...
'setup' => [
'rabbit-default' => [
'queues' => [
'jobs',
'failed',
],
'exchanges' => [
'delayed-jobs'
],
],
],
];
Laravel queue settings:
<?php
use AvtoDev\AmqpRabbitLaravelQueue\Connector;
return [
// ...
'default' => env('QUEUE_DRIVER', 'rabbitmq'),
// ...
'connections' => [
// ...
'rabbitmq' => [
'driver' => Connector::NAME,
'connection' => 'rabbit-default',
'queue_id' => 'jobs',
'delayed_exchange_id' => 'delayed-jobs',
'timeout' => (int) env('QUEUE_TIMEOUT', 0), // The timeout is in milliseconds
'resume' => (bool) env('QUEUE_RESUME', false), // Resume consuming when timeout is over
],
],
// ...
'failed' => [
'connection' => 'rabbit-default',
'queue_id' => 'failed',
],
];
resume
can be used with non-zerotimeout
value for periodic connection reloading (for example, if you set'timeout' => 30000
and'resume' => true
, queue worker will unsubscribe and subscribe back to the queue every 30 seconds without process exiting).
You can remove delayed_exchange_id
for disabling delayed jobs feature.
At the end, don't forget to execute command php ./artisan rabbit:setup
.
Very simple:
You can dispatch your jobs as usual (dispatch(new Job)
or dispatch(new Job)->delay(10)
), commands like queue:work
, queue:failed
, queue:retry
and others works fine.
- Jobs delaying (plugin
rabbitmq_delayed_message_exchange
for RabbitMQ server is required); - Jobs priority (job should implements
PrioritizedJobInterface
interface); - Automatically delayed messages exchanges bindings (only if you use command
rabbit:setup
for queues and exchanges creation); - The ability to store the state of
job
Using this package you can store any variables (except resources and callable entities) between job restarts (just use trait WithJobStateTrait
in your job class). But you should remember - state is available only inside job handle
method.
Every consumer has an identifier that is used by client libraries to determine what handler to invoke for a given delivery. Their names vary from protocol to protocol. Consumer tags and subscription IDs are two most commonly used terms.
If you want to add custom prefix to the consumer tag, you can specify it with an additional argument in the AvtoDev\AmqpRabbitLaravelQueue\Worker::__construct
method.
Be careful with commands queue:failed
and queue:retry
. If during command execution something happens (lost connection, etc) you may loose all failed jobs!
You should avoid to use next method (broker does not guarantee operations order, so calling results may be wrong):
\AvtoDev\AmqpRabbitLaravelQueue\Queue::size()
\AvtoDev\AmqpRabbitLaravelQueue\Failed\RabbitQueueFailedJobProvider::count()
For package testing we use phpunit
framework and docker-ce
+ docker-compose
as develop environment. So, just write into your terminal after repository cloning:
$ make build
$ make latest # or 'make lowest'
$ make test
Changes log can be found here.
If you will find any package errors, please, make an issue in current repository.
This is open-sourced software licensed under the MIT License.