Skip to content

Commit

Permalink
[#15717] allow processing multiple messages at once for sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
simoncarre committed Feb 6, 2024
1 parent 1183e14 commit a32fc84
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 43 deletions.
55 changes: 16 additions & 39 deletions Command/SqsConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,6 @@
*/
class SqsConsumeCommand extends Command
{
/**
* Run pull command every 10 seconds
*
* @var int
*/
const DEFAULT_WAIT = 10;

/**
* Default limit for pulls
*
* @var int
*/
const DEFAULT_LIMIT = -1;

/**
* @var ContainerInterface
*/
Expand Down Expand Up @@ -98,9 +84,7 @@ protected function configure()
{
$this
->setName('autobus:sqs:consume')
->setDescription('Consume AWS SQS messages for current register topics')
->addOption('wait', 'w', InputOption::VALUE_OPTIONAL, 'Sleep time in seconds between each pull command', self::DEFAULT_WAIT)
->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Number of pulls before stopping command. -1 to disable', self::DEFAULT_LIMIT);
->setDescription('Consume AWS SQS messages for current register topics');
}

/**
Expand All @@ -111,35 +95,28 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$pullIndex = 0;
$wait = $input->getOption('wait');
$limit = intval($input->getOption('limit'));
/** @var TopicJob[] $topicJobs */
$topicJobs = $this->entityManager->getRepository('AutobusBusBundle:TopicJob')->findAll();

while ($pullIndex < $limit || $limit === -1) {
// Pull messages for each job
foreach ($topicJobs as $topicJob) {
$pullIndex++;
$topicName = $topicJob->getTopic();
$realTopicName = $this->topicHelper->getRealTopicName($topicName);
$queueUrl = $this->sqsHelper->getQueueUrlByName($realTopicName);
if ($queueUrl === null) {
$this->logger->warning(sprintf('[%s] No queue with name %s', __METHOD__, $realTopicName));
continue;
}
// Pull messages for each job
foreach ($topicJobs as $topicJob) {
$topicName = $topicJob->getTopic();
$realTopicName = $this->topicHelper->getRealTopicName($topicName);
$queueUrl = $this->sqsHelper->getQueueUrlByName($realTopicName);
if ($queueUrl === null) {
$this->logger->warning(sprintf('[%s] No queue with name %s', __METHOD__, $realTopicName));
continue;
}

$messages = $this->sqsHelper->getMessages($queueUrl);
foreach ($messages as $message) {
$this->sqsHelper->deleteMessage($queueUrl, $message['ReceiptHandle']);
if (!$this->processMessage($topicJob, $message)) {
$this->logger->error(sprintf("[%s] Error with message processing for message : \n%s", __METHOD__, print_r($message, true)));
$messages = $this->sqsHelper->getMessages($queueUrl);
foreach ($messages as $message) {
$this->sqsHelper->deleteMessage($queueUrl, $message['ReceiptHandle']);
if (!$this->processMessage($topicJob, $message)) {
$this->logger->error(sprintf("[%s] Error with message processing for message : \n%s", __METHOD__, print_r($message, true)));

return 1;
}
return 1;
}
}
sleep($wait);
}
}

Expand Down
12 changes: 10 additions & 2 deletions Helper/SqsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class SqsHelper
*/
const SQS_MESSAGE_GROUP_ID = 'autobus';

/**
* Default value for max number of messages to pull
*
* @var int
*/
const SQS_DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;

/**
* @var SqsClient
*/
Expand Down Expand Up @@ -131,11 +138,12 @@ public function writeMessage($queueUrl, $message)
*
* @return array
*/
public function getMessages($queueUrl)
public function getMessages($queueUrl, $limit = self::SQS_DEFAULT_MAX_NUMBER_OF_MESSAGES)
{
try {
$result = $this->sqsClient->receiveMessage([
'QueueUrl' => $queueUrl
'QueueUrl' => $queueUrl,
'MaxNumberOfMessages' => $limit
]);
if ($result->hasKey('Messages')) {
return $result->get('Messages');
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ php bin/console autobus:pubsub:consume

* With **AWS SQS**:

Execute the following command with **Supervisor** tool:
Add the following line to your crontab:
```
php bin/console autobus:sqs:consume
*/5 * * * * php bin/console autobus:sqs:consume
```

### Cron jobs
Expand Down

0 comments on commit a32fc84

Please sign in to comment.