From efe1c270c87b2fedca9ce2a89a5362eb30c0bd19 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Thu, 9 Mar 2017 17:24:08 -0500 Subject: [PATCH] :lipstick: --- .../server/exponential_backoff_retry.rb | 18 ++--- .../server/exponential_backoff_retry_spec.rb | 67 +++++++++++-------- 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb index fd63ef31..d664072a 100755 --- a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb +++ b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb @@ -27,6 +27,9 @@ def call(worker, queue, sqs_msg, body) def get_interval(retry_intervals, attempts) return retry_intervals.call(attempts) if retry_intervals.respond_to?(:call) + # Array start at 0 + attempts -= 1 + if attempts < (retry_intervals = Array(retry_intervals)).size retry_intervals[attempts] else @@ -34,10 +37,6 @@ def get_interval(retry_intervals, attempts) end end - def get_attempts(sqs_msg) - sqs_msg.attributes['ApproximateReceiveCount'].to_i - 1 - end - def next_visibility_timeout(interval, started_at) max_timeout = 43_200 - (Time.now - started_at).ceil - 1 interval = max_timeout if interval > max_timeout @@ -45,17 +44,10 @@ def next_visibility_timeout(interval, started_at) end def handle_failure(sqs_msg, started_at, retry_intervals) - return false unless (attempts = get_attempts(sqs_msg)) - - attempts = attempts.to_i - 1 + return false unless sqs_msg.attributes['ApproximateReceiveCount'] - return false unless (interval = get_interval(retry_intervals, attempts)) + return false unless (interval = get_interval(retry_intervals, sqs_msg.attributes['ApproximateReceiveCount'].to_i)) - # Visibility timeouts are limited to a total 12 hours, starting from the receipt of the message. - # We calculate the maximum timeout by subtracting the amount of time since the receipt of the message. - # - # From the docs: "Amazon SQS restarts the timeout period using the new value." - # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html#AboutVT-extending-message-visibility-timeout sqs_msg.change_visibility(visibility_timeout: next_visibility_timeout(interval, started_at)) logger.info { "Message #{sqs_msg.message_id} failed, will be retried in #{interval} seconds." } diff --git a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb index 3dd9caf6..d7bcc708 100755 --- a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb +++ b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb @@ -1,5 +1,6 @@ require 'spec_helper' +# rubocop:disable Metrics/BlockLength, Metrics/BlockDelimiters RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do let(:queue) { 'default' } let(:sqs_queue) { double Shoryuken::Queue } @@ -16,8 +17,8 @@ end end - context 'when a job succeeds' do - it 'does not retry the job' do + context 'when no exception' do + it 'does not retry' do TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] expect(sqs_msg).not_to receive(:change_visibility) @@ -26,60 +27,68 @@ end end - context 'when a job throws an exception' do + context 'when an error' do + context "and retry_intervals isn't set" do + it 'does not retry' do + expect(sqs_msg).not_to receive(:change_visibility) - it 'does not retry the job by default' do - expect(sqs_msg).not_to receive(:change_visibility) + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' } + }.to raise_error(RuntimeError, 'Error') + end + end - expect { - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' } - }.to raise_error(RuntimeError, 'Error') + context 'and retry_intervals is a lambda' do end - it 'does not retry the job if :retry_intervals is empty' do - TestWorker.get_shoryuken_options['retry_intervals'] = [] + context 'and retry_intervals is empty' do + it 'does not retry' do + TestWorker.get_shoryuken_options['retry_intervals'] = [] - expect(sqs_msg).not_to receive(:change_visibility) + expect(sqs_msg).not_to receive(:change_visibility) - expect { - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' } - }.to raise_error(RuntimeError, 'Error') + expect { + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' } + }.to raise_error(RuntimeError, 'Error') + end end - it 'retries the job if :retry_intervals is non-empty' do + it 'uses first interval ' do TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] - allow(sqs_msg).to receive(:queue){ sqs_queue } + allow(sqs_msg).to receive(:queue) { sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 300) expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end - it 'retries the job with exponential backoff' do + it 'uses matching interval' do TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] - allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 2 } } - allow(sqs_msg).to receive(:queue){ sqs_queue } + allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 2 } } + allow(sqs_msg).to receive(:queue) { sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800) expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end - it 'uses the last retry interval when :receive_count exceeds the size of :retry_intervals' do - TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] + context 'when attempts exceeds retry_intervals' do + it 'uses last interval' do + TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] - allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 3 } } - allow(sqs_msg).to receive(:queue){ sqs_queue } - expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800) + allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 3 } } + allow(sqs_msg).to receive(:queue) { sqs_queue } + expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800) - expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error + expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error + end end - it 'limits the visibility timeout to 12 hours from receipt of message' do - TestWorker.get_shoryuken_options['retry_intervals'] = [86400] + it 'limits the visibility timeout to 12 hours' do + TestWorker.get_shoryuken_options['retry_intervals'] = [86_400] - allow(sqs_msg).to receive(:queue){ sqs_queue } - expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43198) + allow(sqs_msg).to receive(:queue) { sqs_queue } + expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43_198) expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end