Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support retry_intervals as a lambda #329

Merged
merged 8 commits into from
Mar 10, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions lib/shoryuken/middleware/server/exponential_backoff_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ class ExponentialBackoffRetry

def call(worker, queue, sqs_msg, body)
if sqs_msg.is_a?(Array)
logger.warn { "Exponential backoff isn't supported for batch workers" }
logger.warn { "Exponential backoff isn't supported for batch workers" }
return yield
end

started_at = Time.now
yield
rescue
retry_intervals = Array(worker.class.get_shoryuken_options['retry_intervals'])
retry_intervals = worker.class.get_shoryuken_options['retry_intervals']

if retry_intervals.empty? || !handle_failure(sqs_msg, started_at, retry_intervals)
if retry_intervals.nil? || !handle_failure(sqs_msg, started_at, retry_intervals)
# Re-raise the exception if the job is not going to be exponential backoff retried.
# This allows custom middleware (like exception notifiers) to be aware of the unhandled failure.
raise
Expand All @@ -24,28 +24,35 @@ def call(worker, queue, sqs_msg, body)

private

def handle_failure(sqs_msg, started_at, retry_intervals)
return unless attempts = sqs_msg.attributes['ApproximateReceiveCount']

attempts = attempts.to_i - 1

interval = if attempts < retry_intervals.size
retry_intervals[attempts]
else
retry_intervals.last
end

# Visibility timeouts are limited to a total 12 hours, starting from the receipt of the message.
# We calculate the maximum timeout by subtracting the amount of time since the receipt of the message.
#
# From the docs: "Amazon SQS restarts the timeout period using the new value."
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html#AboutVT-extending-message-visibility-timeout
max_timeout = 43200 - (Time.now - started_at).ceil - 1
def get_interval(retry_intervals, attempts)
return retry_intervals.call(attempts) if retry_intervals.respond_to?(:call)

# Arrays start at 0
attempts -= 1

if attempts < (retry_intervals = Array(retry_intervals)).size
retry_intervals[attempts]
else
retry_intervals.last
end.to_i
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixbuenemann is that what you were asking for in #303 (comment)?

end

def next_visibility_timeout(interval, started_at)
max_timeout = 43_200 - (Time.now - started_at).ceil - 1
interval = max_timeout if interval > max_timeout
interval.to_i
end

def handle_failure(sqs_msg, started_at, retry_intervals)
return false if (receive_count = sqs_msg.attributes['ApproximateReceiveCount'].to_i).zero?

sqs_msg.change_visibility(visibility_timeout: interval.to_i)
return false unless (interval = get_interval(retry_intervals, receive_count))

sqs_msg.change_visibility(visibility_timeout: next_visibility_timeout(interval, started_at))

logger.info { "Message #{sqs_msg.message_id} failed, will be retried in #{interval} seconds." }

true
end
end
end
Expand Down
75 changes: 46 additions & 29 deletions spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'spec_helper'

# rubocop:disable Metrics/BlockLength, Metrics/BlockDelimiters
RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do
let(:queue) { 'default' }
let(:sqs_queue) { double Shoryuken::Queue }
Expand All @@ -16,8 +17,8 @@
end
end

context 'when a job succeeds' do
it 'does not retry the job' do
context 'when no exception' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

expect(sqs_msg).not_to receive(:change_visibility)
Expand All @@ -26,60 +27,76 @@
end
end

context 'when a job throws an exception' do
context 'when an error' do
context "and retry_intervals isn't set" do
it 'does not retry' do
expect(sqs_msg).not_to receive(:change_visibility)

it 'does not retry the job by default' do
expect(sqs_msg).not_to receive(:change_visibility)
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

context 'and retry_intervals is a lambda' do
it 'retries' do
TestWorker.get_shoryuken_options['retry_intervals'] = ->(_attempts) { 500 }

allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 500)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
end

it 'does not retry the job if :retry_intervals is empty' do
TestWorker.get_shoryuken_options['retry_intervals'] = []
context 'and retry_intervals is empty' do
it 'does not retry' do
TestWorker.get_shoryuken_options['retry_intervals'] = []

expect(sqs_msg).not_to receive(:change_visibility)
expect(sqs_msg).not_to receive(:change_visibility)

expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
expect {
subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' }
}.to raise_error(RuntimeError, 'Error')
end
end

it 'retries the job if :retry_intervals is non-empty' do
it 'uses first interval ' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 300)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'retries the job with exponential backoff' do
it 'uses matching interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 2 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end

it 'uses the last retry interval when :receive_count exceeds the size of :retry_intervals' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]
context 'when attempts exceeds retry_intervals' do
it 'uses last interval' do
TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800]

allow(sqs_msg).to receive(:attributes){ {'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)
allow(sqs_msg).to receive(:attributes) { { 'ApproximateReceiveCount' => 3 } }
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
end

it 'limits the visibility timeout to 12 hours from receipt of message' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86400]
it 'limits the visibility timeout to 12 hours' do
TestWorker.get_shoryuken_options['retry_intervals'] = [86_400]

allow(sqs_msg).to receive(:queue){ sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43198)
allow(sqs_msg).to receive(:queue) { sqs_queue }
expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43_198)

expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error
end
Expand Down