From a32fc842df55799cea67a2316e41258b28971f83 Mon Sep 17 00:00:00 2001 From: Simon CARRE Date: Tue, 6 Feb 2024 15:56:18 +0100 Subject: [PATCH] [#15717] allow processing multiple messages at once for sqs --- Command/SqsConsumeCommand.php | 55 ++++++++++------------------------- Helper/SqsHelper.php | 12 ++++++-- README.md | 4 +-- 3 files changed, 28 insertions(+), 43 deletions(-) diff --git a/Command/SqsConsumeCommand.php b/Command/SqsConsumeCommand.php index 0d661ca..097b8af 100644 --- a/Command/SqsConsumeCommand.php +++ b/Command/SqsConsumeCommand.php @@ -25,20 +25,6 @@ */ class SqsConsumeCommand extends Command { - /** - * Run pull command every 10 seconds - * - * @var int - */ - const DEFAULT_WAIT = 10; - - /** - * Default limit for pulls - * - * @var int - */ - const DEFAULT_LIMIT = -1; - /** * @var ContainerInterface */ @@ -98,9 +84,7 @@ protected function configure() { $this ->setName('autobus:sqs:consume') - ->setDescription('Consume AWS SQS messages for current register topics') - ->addOption('wait', 'w', InputOption::VALUE_OPTIONAL, 'Sleep time in seconds between each pull command', self::DEFAULT_WAIT) - ->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Number of pulls before stopping command. -1 to disable', self::DEFAULT_LIMIT); + ->setDescription('Consume AWS SQS messages for current register topics'); } /** @@ -111,35 +95,28 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { - $pullIndex = 0; - $wait = $input->getOption('wait'); - $limit = intval($input->getOption('limit')); /** @var TopicJob[] $topicJobs */ $topicJobs = $this->entityManager->getRepository('AutobusBusBundle:TopicJob')->findAll(); - while ($pullIndex < $limit || $limit === -1) { - // Pull messages for each job - foreach ($topicJobs as $topicJob) { - $pullIndex++; - $topicName = $topicJob->getTopic(); - $realTopicName = $this->topicHelper->getRealTopicName($topicName); - $queueUrl = $this->sqsHelper->getQueueUrlByName($realTopicName); - if ($queueUrl === null) { - $this->logger->warning(sprintf('[%s] No queue with name %s', __METHOD__, $realTopicName)); - continue; - } + // Pull messages for each job + foreach ($topicJobs as $topicJob) { + $topicName = $topicJob->getTopic(); + $realTopicName = $this->topicHelper->getRealTopicName($topicName); + $queueUrl = $this->sqsHelper->getQueueUrlByName($realTopicName); + if ($queueUrl === null) { + $this->logger->warning(sprintf('[%s] No queue with name %s', __METHOD__, $realTopicName)); + continue; + } - $messages = $this->sqsHelper->getMessages($queueUrl); - foreach ($messages as $message) { - $this->sqsHelper->deleteMessage($queueUrl, $message['ReceiptHandle']); - if (!$this->processMessage($topicJob, $message)) { - $this->logger->error(sprintf("[%s] Error with message processing for message : \n%s", __METHOD__, print_r($message, true))); + $messages = $this->sqsHelper->getMessages($queueUrl); + foreach ($messages as $message) { + $this->sqsHelper->deleteMessage($queueUrl, $message['ReceiptHandle']); + if (!$this->processMessage($topicJob, $message)) { + $this->logger->error(sprintf("[%s] Error with message processing for message : \n%s", __METHOD__, print_r($message, true))); - return 1; - } + return 1; } } - sleep($wait); } } diff --git a/Helper/SqsHelper.php b/Helper/SqsHelper.php index c62f4b9..5cb9e33 100644 --- a/Helper/SqsHelper.php +++ b/Helper/SqsHelper.php @@ -42,6 +42,13 @@ class SqsHelper */ const SQS_MESSAGE_GROUP_ID = 'autobus'; + /** + * Default value for max number of messages to pull + * + * @var int + */ + const SQS_DEFAULT_MAX_NUMBER_OF_MESSAGES = 10; + /** * @var SqsClient */ @@ -131,11 +138,12 @@ public function writeMessage($queueUrl, $message) * * @return array */ - public function getMessages($queueUrl) + public function getMessages($queueUrl, $limit = self::SQS_DEFAULT_MAX_NUMBER_OF_MESSAGES) { try { $result = $this->sqsClient->receiveMessage([ - 'QueueUrl' => $queueUrl + 'QueueUrl' => $queueUrl, + 'MaxNumberOfMessages' => $limit ]); if ($result->hasKey('Messages')) { return $result->get('Messages'); diff --git a/README.md b/README.md index e323162..1888922 100644 --- a/README.md +++ b/README.md @@ -56,9 +56,9 @@ php bin/console autobus:pubsub:consume * With **AWS SQS**: -Execute the following command with **Supervisor** tool: + Add the following line to your crontab: ``` -php bin/console autobus:sqs:consume +*/5 * * * * php bin/console autobus:sqs:consume ``` ### Cron jobs