Skip to content

Commit

Permalink
Merge pull request #420 from phstc/feature/allow-receive-options-per-…
Browse files Browse the repository at this point in the history
…queue

Allow receive options per queue
  • Loading branch information
phstc authored Jul 31, 2017
2 parents 9546c16 + 1215f62 commit 1a6bea6
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
20 changes: 13 additions & 7 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ class Fetcher

FETCH_LIMIT = 10

attr_reader :group

def initialize(group)
@group = group
end
Expand All @@ -26,18 +24,26 @@ def fetch(queue, limit)
private

def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = Shoryuken.sqs_client_receive_message_opts[group].to_h.dup
options = receive_options(queue)

options[:max_number_of_messages] = limit
options[:max_number_of_messages] = max_number_of_messages(limit, options)
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

options.merge!(queue.options)

Shoryuken::Client.queues(queue.name).receive_messages(options)
end

def max_number_of_messages(limit, options)
[limit, FETCH_LIMIT, options[:max_number_of_messages]].compact.min
end

def receive_options(queue)
options = Shoryuken.sqs_client_receive_message_opts[queue.name]
options ||= Shoryuken.sqs_client_receive_message_opts[@group]

options.to_h.dup
end
end
end
54 changes: 45 additions & 9 deletions spec/shoryuken/fetcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'shoryuken/manager'
require 'shoryuken/fetcher'

# rubocop:disable Metrics/BlockLength
RSpec.describe Shoryuken::Fetcher do
let(:queue) { instance_double('Shoryuken::Queue') }
let(:queue_name) { 'default' }
Expand All @@ -13,7 +14,7 @@
Shoryuken::Message,
queue_url: queue_name,
body: 'test',
message_id: 'fc754df79cc24c4196ca5996a44b771e',
message_id: 'fc754df79cc24c4196ca5996a44b771e'
)
end

Expand All @@ -27,23 +28,58 @@

Shoryuken.sqs_client_receive_message_opts[group] = { wait_time_seconds: 10 }

expect(queue).to receive(:receive_messages).
with(wait_time_seconds: 10, max_number_of_messages: limit, message_attribute_names: ['All'], attribute_names: ['All']).
and_return([])
expect(queue).to receive(:receive_messages).with(
wait_time_seconds: 10,
max_number_of_messages: limit,
message_attribute_names: ['All'],
attribute_names: ['All']
).and_return([])

subject.fetch(queue_config, limit)
end

context 'when receive options per queue' do
let(:limit) { 5 }

specify do
expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue)

Shoryuken.sqs_client_receive_message_opts[queue_name] = { max_number_of_messages: 1 }

expect(queue).to receive(:receive_messages).with(
max_number_of_messages: 1,
message_attribute_names: ['All'],
attribute_names: ['All']
).and_return([])

subject.fetch(queue_config, limit)
end
end

context 'when max_number_of_messages opt is great than limit' do
it 'uses limit' do
expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue)

Shoryuken.sqs_client_receive_message_opts[queue_name] = { max_number_of_messages: 20 }

expect(queue).to receive(:receive_messages).with(
max_number_of_messages: limit,
message_attribute_names: ['All'],
attribute_names: ['All']
).and_return([])

subject.fetch(queue_config, limit)
end
end

context 'when limit is greater than FETCH_LIMIT' do
let(:limit) { 20 }

specify do
Shoryuken.sqs_client_receive_message_opts[group] = {}

allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue)
expect(queue).to receive(:receive_messages).
with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']).
and_return([])
expect(queue).to receive(:receive_messages).with(
max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']
).and_return([])

subject.fetch(queue_config, limit)
end
Expand Down
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def perform(sqs_msg, body); end

Aws.config[:stub_responses] = true

Shoryuken.sqs_client_receive_message_opts.clear

allow(Concurrent).to receive(:global_io_executor).and_return(Concurrent::ImmediateExecutor.new)
allow(Shoryuken).to receive(:active_job?).and_return(false)
end
Expand Down

0 comments on commit 1a6bea6

Please sign in to comment.