Skip to content

Commit

Permalink
💄
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Mar 9, 2017
1 parent 07cd7ce commit efe1c27
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 42 deletions.
18 changes: 5 additions & 13 deletions lib/shoryuken/middleware/server/exponential_backoff_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,27 @@ def call(worker, queue, sqs_msg, body)
def get_interval(retry_intervals, attempts)
return retry_intervals.call(attempts) if retry_intervals.respond_to?(:call)

# Array start at 0
attempts -= 1

if attempts < (retry_intervals = Array(retry_intervals)).size
retry_intervals[attempts]
else
retry_intervals.last
end
end

def get_attempts(sqs_msg)
sqs_msg.attributes['ApproximateReceiveCount'].to_i - 1
end

def next_visibility_timeout(interval, started_at)
max_timeout = 43_200 - (Time.now - started_at).ceil - 1
interval = max_timeout if interval > max_timeout
interval.to_i
end

def handle_failure(sqs_msg, started_at, retry_intervals)
return false unless (attempts = get_attempts(sqs_msg))

attempts = attempts.to_i - 1
return false unless sqs_msg.attributes['ApproximateReceiveCount']

return false unless (interval = get_interval(retry_intervals, attempts))
return false unless (interval = get_interval(retry_intervals, sqs_msg.attributes['ApproximateReceiveCount'].to_i))

# Visibility timeouts are limited to a total 12 hours, starting from the receipt of the message.
# We calculate the maximum timeout by subtracting the amount of time since the receipt of the message.
#
# From the docs: "Amazon SQS restarts the timeout period using the new value."
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html#AboutVT-extending-message-visibility-timeout
sqs_msg.change_visibility(visibility_timeout: next_visibility_timeout(interval, started_at))

logger.info { "Message #{sqs_msg.message_id} failed, will be retried in #{interval} seconds." }
Expand Down
67 changes: 38 additions & 29 deletions spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'spec_helper'

# rubocop:disable Metrics/BlockLength, Metrics/BlockDelimiters
RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do
let(:queue) { 'default' }
let(:sqs_queue) { double Shoryuken::Queue }
Expand All @@ -16,8 +17,8 @@
end
end

context 'when a job succeeds' do
it 'does not retry the job' do
context 'when no exception' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

expect(sqs_msg).not_to receive(:change_visibility)
Expand All @@ -26,60 +27,68 @@
end
end

context 'when a job throws an exception' do
context 'when an error' do
context "and retry_intervals isn't set" do
it 'does not retry' do
expect(sqs_msg).not_to receive(:change_visibility)

it 'does not retry the job by default' do
expect(sqs_msg).not_to receive(:change_visibility)
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
context 'and retry_intervals is a lambda' do
end

it 'does not retry the job if :retry_intervals is empty' do
TestWorker.get_shoryuken_options['retry_intervals'] = []
context 'and retry_intervals is empty' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = []

expect(sqs_msg).not_to receive(:change_visibility)
expect(sqs_msg).not_to receive(:change_visibility)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

it 'retries the job if :retry_intervals is non-empty' do
it 'uses first interval ' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 300)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'retries the job with exponential backoff' do
it 'uses matching interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'uses the last retry interval when :receive_count exceeds the size of :retry_intervals' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]
context 'when attempts exceeds retry_intervals' do
it 'uses last interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
end

it 'limits the visibility timeout to 12 hours from receipt of message' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86400]
it 'limits the visibility timeout to 12 hours' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86_400]

allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43198)
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43_198)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
Expand Down

0 comments on commit efe1c27

Please sign in to comment.