diff --git a/django_q/brokers/aws_sqs.py b/django_q/brokers/aws_sqs.py index 4004af41..256813b5 100644 --- a/django_q/brokers/aws_sqs.py +++ b/django_q/brokers/aws_sqs.py @@ -4,7 +4,6 @@ from django_q.brokers import Broker from django_q.conf import Conf - QUEUE_DOES_NOT_EXIST = "AWS.SimpleQueueService.NonExistentQueue" @@ -27,9 +26,22 @@ def dequeue(self): # sqs supports max 10 messages in bulk if Conf.BULK > 10: Conf.BULK = 10 - tasks = self.queue.receive_messages( - MaxNumberOfMessages=Conf.BULK, VisibilityTimeout=Conf.RETRY - ) + + params = {"MaxNumberOfMessages": Conf.BULK, "VisibilityTimeout": Conf.RETRY} + + # sqs long polling + sqs_config = Conf.SQS + if "receive_message_wait_time_seconds" in sqs_config: + wait_time_second = sqs_config.get("receive_message_wait_time_seconds", 20) + + # validation of parameter + if not isinstance(wait_time_second, int): + raise ValueError("receive_message_wait_time_seconds should be int") + if wait_time_second > 20: + raise ValueError("receive_message_wait_time_seconds is invalid. Reason: Must be >= 0 and <= 20") + params.update({"WaitTimeSeconds": wait_time_second}) + + tasks = self.queue.receive_messages(**params) if tasks: return [(t.receipt_handle, t.body) for t in tasks] @@ -67,6 +79,10 @@ def get_connection(list_key: str = Conf.PREFIX) -> Session: if "aws_region" in config: config["region_name"] = config["aws_region"] del config["aws_region"] + + if 'receive_message_wait_time_seconds' in config: + del config["receive_message_wait_time_seconds"] + return Session(**config) def get_queue(self): diff --git a/django_q/tests/test_brokers.py b/django_q/tests/test_brokers.py index 064ecf19..ee9bd8ad 100644 --- a/django_q/tests/test_brokers.py +++ b/django_q/tests/test_brokers.py @@ -194,6 +194,7 @@ def canceled_sqs(monkeypatch): "aws_region": os.getenv("AWS_REGION"), "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), + "receive_message_wait_time_seconds": 20 }, ) # check broker