From d5a38fa79530614360cee8f44c9b4aaa579aeb1f Mon Sep 17 00:00:00 2001 From: Garry Shutler Date: Thu, 4 Jun 2015 16:15:57 +0100 Subject: [PATCH 01/11] Extracted queue polling strategy In order to be able to customize the order and manner in which Shoryuken polls SQS queues, the previous hard-coded strategy was extracted into a separate class. This meant that some of the method names were altered to be more generic (for example rebalance_queue_weight! became messages_received), otherwise this was a direct extraction. --- lib/shoryuken.rb | 4 +- lib/shoryuken/fetcher.rb | 37 +++++++-------- lib/shoryuken/manager.rb | 61 +++++++++---------------- lib/shoryuken/polling.rb | 82 ++++++++++++++++++++++++++++++++++ spec/shoryuken/fetcher_spec.rb | 17 +++---- spec/shoryuken/manager_spec.rb | 30 ++++++------- spec/shoryuken/polling_spec.rb | 63 ++++++++++++++++++++++++++ 7 files changed, 212 insertions(+), 82 deletions(-) create mode 100644 lib/shoryuken/polling.rb create mode 100644 spec/shoryuken/polling_spec.rb diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index 9b898585..d777b154 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -21,6 +21,7 @@ require 'shoryuken/middleware/server/timing' require 'shoryuken/sns_arn' require 'shoryuken/topic' +require 'shoryuken/polling' module Shoryuken DEFAULTS = { @@ -33,7 +34,8 @@ module Shoryuken startup: [], quiet: [], shutdown: [], - } + }, + polling_strategy: Polling::WeightedRoundRobin, } @@queues = [] diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index d7e4cddb..6d732545 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -9,18 +9,6 @@ def initialize(manager) @manager = manager end - def receive_messages(queue, limit) - # AWS limits the batch size by 10 - limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - - options = (Shoryuken::AwsConfig.options[:receive_message] || {}).dup - options[:max_number_of_messages] = limit - options[:message_attribute_names] = %w(All) - options[:attribute_names] = %w(All) - - Shoryuken::Client.queues(queue).receive_messages options - end - def fetch(queue, available_processors) watchdog('Fetcher#fetch died') do started_at = Time.now @@ -28,23 +16,23 @@ def fetch(queue, available_processors) logger.debug { "Looking for new messages in '#{queue}'" } begin - batch = Shoryuken.worker_registry.batch_receive_messages?(queue) + batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) limit = batch ? FETCH_LIMIT : available_processors if (sqs_msgs = Array(receive_messages(queue, limit))).any? logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" } if batch - @manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs)) + @manager.async.assign(queue.name, patch_sqs_msgs!(sqs_msgs)) else - sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) } + sqs_msgs.each { |sqs_msg| @manager.async.assign(queue.name, sqs_msg) } end - @manager.async.rebalance_queue_weight!(queue) + @manager.async.messages_present(queue) else logger.debug { "No message found for '#{queue}'" } - @manager.async.pause_queue!(queue) + @manager.async.queue_empty(queue) end logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } @@ -55,11 +43,24 @@ def fetch(queue, available_processors) @manager.async.dispatch end - end private + def receive_messages(queue, limit) + # AWS limits the batch size by 10 + limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit + + options = (Shoryuken.options[:aws][:receive_message] || {}).dup + options[:max_number_of_messages] = limit + options[:message_attribute_names] = %w(All) + options[:attribute_names] = %w(All) + + options.merge!(queue.options) + + Shoryuken::Client.queues(queue.name).receive_messages options + end + def patch_sqs_msgs!(sqs_msgs) sqs_msgs.instance_eval do def message_id diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index a26abc2e..085f29ed 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -1,4 +1,5 @@ require 'shoryuken/processor' +require 'shoryuken/polling' require 'shoryuken/fetcher' module Shoryuken @@ -14,6 +15,7 @@ def initialize(condvar) @count = Shoryuken.options[:concurrency] || 25 raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 @queues = Shoryuken.queues.dup.uniq + @polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) @finished = condvar @done = false @@ -105,22 +107,18 @@ def assign(queue, sqs_msg) end end - def rebalance_queue_weight!(queue) - watchdog('Manager#rebalance_queue_weight! died') do - if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue)) - logger.info { "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" } - - @queues << queue - end + def messages_present(queue) + watchdog('Manager#messages_present died') do + @polling_strategy.messages_present(queue) end end - def pause_queue!(queue) - return if !@queues.include?(queue) || Shoryuken.options[:delay].to_f <= 0 + def queue_empty(queue) + return if Shoryuken.options[:delay].to_f <= 0 logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" } - @queues.delete(queue) + @polling_strategy.pause(queue) after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) } end @@ -129,7 +127,7 @@ def pause_queue!(queue) def dispatch return if stopped? - logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" } + logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@polling_strategy.active_queues}" } if @ready.empty? logger.debug { 'Pausing fetcher, because all processors are busy' } @@ -169,51 +167,34 @@ def build_processor def restart_queue!(queue) return if stopped? - unless @queues.include? queue - logger.debug { "Restarting '#{queue}'" } + @polling_strategy.restart(queue) - @queues << queue + if @fetcher_paused + logger.debug { 'Restarting fetcher' } - if @fetcher_paused - logger.debug { 'Restarting fetcher' } + @fetcher_paused = false - @fetcher_paused = false - - dispatch - end + dispatch end end - def current_queue_weight(queue) - queue_weight(@queues, queue) - end - - def original_queue_weight(queue) - queue_weight(Shoryuken.queues, queue) - end - - def queue_weight(queues, queue) - queues.count { |q| q == queue } - end - def next_queue - return nil if @queues.empty? - # get/remove the first queue in the list - queue = @queues.shift + queue = @polling_strategy.next_queue - unless defined?(::ActiveJob) || !Shoryuken.worker_registry.workers(queue).empty? + return nil unless queue + + if queue && (not defined?(::ActiveJob) and Shoryuken.worker_registry.workers(queue.name).empty?) # when no worker registered pause the queue to avoid endless recursion logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" } + @polling_strategy.pause(queue) + after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) } - return next_queue + queue = next_queue end - # add queue back to the end of the list - @queues << queue - queue end diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb new file mode 100644 index 00000000..e96bb045 --- /dev/null +++ b/lib/shoryuken/polling.rb @@ -0,0 +1,82 @@ +module Shoryuken + module Polling + QueueConfiguration = Struct.new(:name, :options) do + def ==(other) + case other + when String + if options.empty? + name == other + else + false + end + else + super + end + end + end + + class WeightedRoundRobin + include Util + + def initialize(queues) + @initial_queues = queues + @queues = queues.dup.uniq + end + + def active_queues + unparse_queues(@queues) + end + + def next_queue + queue = @queues.shift + @queues << queue + QueueConfiguration.new(queue, {}) + end + + def messages_present(queue) + return unless (original = original_queue_weight(queue)) > (current = current_queue_weight(queue)) + + logger.info "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" + @queues << queue + end + + def pause(queue) + return unless @queues.delete(queue) + logger.debug "Paused '#{queue}'" + end + + def restart(queue) + return if @queues.include?(queue) + logger.debug "Restarting '#{queue}'" + @queues << queue + end + + def ==(other) + case other + when Array + @queues == other + else + if other.respond_to?(:active_queues) + self.active_queues == other.active_queues + else + false + end + end + end + + private + + def current_queue_weight(queue) + queue_weight(@queues, queue) + end + + def original_queue_weight(queue) + queue_weight(@initial_queues, queue) + end + + def queue_weight(queues, queue) + queues.count { |q| q == queue } + end + end + end +end diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index 550a233e..1af1ee07 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -6,6 +6,7 @@ let(:manager) { double Shoryuken::Manager } let(:queue) { double Shoryuken::Queue } let(:queue_name) { 'default' } + let(:queue_config) { Shoryuken::Polling::QueueConfiguration.new(queue_name, {}) } let(:sqs_msg) do double Shoryuken::Message, @@ -26,20 +27,20 @@ it 'calls pause when no message' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']).and_return([]) - expect(manager).to receive(:pause_queue!).with(queue_name) + expect(manager).to receive(:queue_empty).with(queue_name) expect(manager).to receive(:dispatch) - subject.fetch(queue_name, 1) + subject.fetch(queue_config, 1) end it 'assigns messages' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:rebalance_queue_weight!).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_name) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) - subject.fetch(queue_name, 5) + subject.fetch(queue_config, 5) end it 'assigns messages in batch' do @@ -47,11 +48,11 @@ allow(queue).to receive(:receive_messages).with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:rebalance_queue_weight!).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_name) expect(manager).to receive(:assign).with(queue_name, [sqs_msg]) expect(manager).to receive(:dispatch) - subject.fetch(queue_name, 5) + subject.fetch(queue_config, 5) end context 'when worker not found' do @@ -60,11 +61,11 @@ it 'ignores batch' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:rebalance_queue_weight!).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_name) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) - subject.fetch(queue_name, 5) + subject.fetch(queue_config, 5) end end end diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 0ef0b6ba..42d136b0 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -29,11 +29,11 @@ Shoryuken.queues << queue1 Shoryuken.queues << queue2 - expect(subject.instance_variable_get('@queues')).to eq [queue1, queue2] + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] - subject.pause_queue!(queue1) + subject.queue_empty(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2] + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] end it 'increases weight' do @@ -48,18 +48,18 @@ Shoryuken.queues << queue1 Shoryuken.queues << queue2 - expect(subject.instance_variable_get('@queues')).to eq [queue1, queue2] - subject.pause_queue!(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2] + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] + subject.queue_empty(queue1) + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - subject.rebalance_queue_weight!(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2, queue1] + subject.messages_present(queue1) + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] - subject.rebalance_queue_weight!(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2, queue1, queue1] + subject.messages_present(queue1) + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1] - subject.rebalance_queue_weight!(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2, queue1, queue1, queue1] + subject.messages_present(queue1) + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1, queue1] end it 'adds queue back' do @@ -78,12 +78,12 @@ fetcher = double('Fetcher').as_null_object subject.fetcher = fetcher - subject.pause_queue!(queue1) - expect(subject.instance_variable_get('@queues')).to eq [queue2] + subject.queue_empty(queue1) + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] sleep 0.5 - expect(subject.instance_variable_get('@queues')).to eq [queue2, queue1] + expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] end end diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb new file mode 100644 index 00000000..a86d5855 --- /dev/null +++ b/spec/shoryuken/polling_spec.rb @@ -0,0 +1,63 @@ +require 'spec_helper' +require 'shoryuken/polling' + +describe Shoryuken::Polling do + let(:queue1) { "shoryuken" } + let(:queue2) { "uppercut" } + let(:queues) { Array.new } + + describe Shoryuken::Polling::WeightedRoundRobin do + subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } + + it "decreases weight" do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject).to eq [queue1, queue2] + + subject.pause(queue1) + + expect(subject).to eq [queue2] + end + + it "increases weight" do + # [shoryuken, 3] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject).to eq [queue1, queue2] + subject.pause(queue1) + expect(subject).to eq [queue2] + + subject.messages_present(queue1) + expect(subject).to eq [queue2, queue1] + + subject.messages_present(queue1) + expect(subject).to eq [queue2, queue1, queue1] + + subject.messages_present(queue1) + expect(subject).to eq [queue2, queue1, queue1, queue1] + end + + it "cycles" do + # [shoryuken, 1] + # [uppercut, 1] + queues << queue1 + queues << queue2 + + popped = [] + + (queues.size * 3).times do + popped << subject.next_queue + end + + expect(popped).to eq(queues * 3) + end + end +end From 1fd45b0f6739f873a1dc02e5389c202ad509a4c8 Mon Sep 17 00:00:00 2001 From: Garry Shutler Date: Fri, 5 Jun 2015 16:16:18 +0100 Subject: [PATCH 02/11] Fixed warnings from houndci --- lib/shoryuken/manager.rb | 16 ++++++++++------ lib/shoryuken/polling.rb | 2 +- spec/shoryuken/polling_spec.rb | 10 +++++----- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 085f29ed..e8cc6a8d 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -114,13 +114,13 @@ def messages_present(queue) end def queue_empty(queue) - return if Shoryuken.options[:delay].to_f <= 0 + return if delay <= 0 - logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" } + logger.debug { "Pausing '#{queue}' for #{delay} seconds, because it's empty" } @polling_strategy.pause(queue) - after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) } + after(delay) { async.restart_queue!(queue) } end @@ -158,6 +158,10 @@ def dispatch_later end end + def delay + Shoryuken.options[:delay].to_f + end + def build_processor processor = Processor.new_link(current_actor) processor.proxy_id = processor.object_id @@ -184,13 +188,13 @@ def next_queue return nil unless queue - if queue && (not defined?(::ActiveJob) and Shoryuken.worker_registry.workers(queue.name).empty?) + if queue && (!defined?(::ActiveJob) && Shoryuken.worker_registry.workers(queue.name).empty?) # when no worker registered pause the queue to avoid endless recursion - logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" } + logger.debug { "Pausing '#{queue}' for #{delay} seconds, because no workers registered" } @polling_strategy.pause(queue) - after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) } + after(delay) { async.restart_queue!(queue) } queue = next_queue end diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index e96bb045..fff2eb58 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -57,7 +57,7 @@ def ==(other) @queues == other else if other.respond_to?(:active_queues) - self.active_queues == other.active_queues + active_queues == other.active_queues else false end diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb index a86d5855..47f33bb6 100644 --- a/spec/shoryuken/polling_spec.rb +++ b/spec/shoryuken/polling_spec.rb @@ -2,14 +2,14 @@ require 'shoryuken/polling' describe Shoryuken::Polling do - let(:queue1) { "shoryuken" } - let(:queue2) { "uppercut" } + let(:queue1) { 'shoryuken' } + let(:queue2) { 'uppercut' } let(:queues) { Array.new } describe Shoryuken::Polling::WeightedRoundRobin do subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - it "decreases weight" do + it 'decreases weight' do # [shoryuken, 2] # [uppercut, 1] queues << queue1 @@ -23,7 +23,7 @@ expect(subject).to eq [queue2] end - it "increases weight" do + it 'increases weight' do # [shoryuken, 3] # [uppercut, 1] queues << queue1 @@ -45,7 +45,7 @@ expect(subject).to eq [queue2, queue1, queue1, queue1] end - it "cycles" do + it 'cycles' do # [shoryuken, 1] # [uppercut, 1] queues << queue1 From 314038df34b1f97dd39ef528bfccbdec21668751 Mon Sep 17 00:00:00 2001 From: Garry Shutler Date: Wed, 17 Jun 2015 17:57:08 +0100 Subject: [PATCH 03/11] RSpec 3.3.0 alterations Mock "receive" calls appear to use a stronger interpretation of equality. Also altered assertions around raised errors to suppress new warnings. --- lib/shoryuken/polling.rb | 6 ++++++ spec/shoryuken/fetcher_spec.rb | 8 ++++---- spec/shoryuken/middleware/server/auto_delete_spec.rb | 4 ++-- .../middleware/server/exponential_backoff_retry_spec.rb | 8 ++++---- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index fff2eb58..508156aa 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -1,6 +1,10 @@ module Shoryuken module Polling QueueConfiguration = Struct.new(:name, :options) do + def hash + name.hash + end + def ==(other) case other when String @@ -13,6 +17,8 @@ def ==(other) super end end + + alias_method :eql?, :== end class WeightedRoundRobin diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index 1af1ee07..a5b43bdb 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -27,7 +27,7 @@ it 'calls pause when no message' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']).and_return([]) - expect(manager).to receive(:queue_empty).with(queue_name) + expect(manager).to receive(:queue_empty).with(queue_config) expect(manager).to receive(:dispatch) subject.fetch(queue_config, 1) @@ -36,7 +36,7 @@ it 'assigns messages' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_config) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) @@ -48,7 +48,7 @@ allow(queue).to receive(:receive_messages).with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_config) expect(manager).to receive(:assign).with(queue_name, [sqs_msg]) expect(manager).to receive(:dispatch) @@ -61,7 +61,7 @@ it 'ignores batch' do allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_name) + expect(manager).to receive(:messages_present).with(queue_config) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) diff --git a/spec/shoryuken/middleware/server/auto_delete_spec.rb b/spec/shoryuken/middleware/server/auto_delete_spec.rb index 8e695f37..7374bda0 100644 --- a/spec/shoryuken/middleware/server/auto_delete_spec.rb +++ b/spec/shoryuken/middleware/server/auto_delete_spec.rb @@ -55,8 +55,8 @@ def build_message expect(sqs_queue).to_not receive(:delete_messages) expect { - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'Error' } - }.to raise_error(RuntimeError, 'Error') + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } + }.to raise_error('failed') end end end diff --git a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb index a0ec94e8..c7abc3d0 100755 --- a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb +++ b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb @@ -46,7 +46,7 @@ allow(sqs_msg).to receive(:queue){ sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 300) - expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise } }.not_to raise_error + expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end it 'retries the job with exponential backoff' do @@ -56,7 +56,7 @@ allow(sqs_msg).to receive(:queue){ sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800) - expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise } }.not_to raise_error + expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end it 'uses the last retry interval when :receive_count exceeds the size of :retry_intervals' do @@ -66,7 +66,7 @@ allow(sqs_msg).to receive(:queue){ sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 1800) - expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise } }.not_to raise_error + expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end it 'limits the visibility timeout to 12 hours from receipt of message' do @@ -75,7 +75,7 @@ allow(sqs_msg).to receive(:queue){ sqs_queue } expect(sqs_msg).to receive(:change_visibility).with(visibility_timeout: 43198) - expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise } }.not_to raise_error + expect { subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body) { raise 'failed' } }.not_to raise_error end end end From ef73e173c3c4624d278baf5088a330cffcc5a5b2 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sun, 31 Jul 2016 10:58:46 +0100 Subject: [PATCH 04/11] Move queue pausing responsibility to PollingStrategy --- lib/shoryuken/fetcher.rb | 95 +++++++++++++------------- lib/shoryuken/launcher.rb | 3 +- lib/shoryuken/manager.rb | 71 ++++--------------- lib/shoryuken/polling.rb | 54 ++++++++++----- spec/shoryuken/fetcher_spec.rb | 32 ++++++--- spec/shoryuken/manager_spec.rb | 120 ++++----------------------------- spec/shoryuken/polling_spec.rb | 93 +++++++++++++++++-------- 7 files changed, 200 insertions(+), 268 deletions(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 6d732545..5e7b2004 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -1,26 +1,28 @@ -module Shoryuken - class Fetcher - include Celluloid - include Util + module Shoryuken + class Fetcher + include Celluloid + include Util - FETCH_LIMIT = 10 + FETCH_LIMIT = 10 - def initialize(manager) - @manager = manager - end + def initialize(manager, polling_strategy) + @manager = manager + @polling_strategy = polling_strategy + @delay = Shoryuken.options[:delay].to_f + end - def fetch(queue, available_processors) - watchdog('Fetcher#fetch died') do - started_at = Time.now + def fetch(queue, available_processors) + watchdog('Fetcher#fetch died') do + started_at = Time.now - logger.debug { "Looking for new messages in '#{queue}'" } + logger.debug { "Looking for new messages in '#{queue}'" } - begin - batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) - limit = batch ? FETCH_LIMIT : available_processors + begin + batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) + limit = batch ? FETCH_LIMIT : available_processors - if (sqs_msgs = Array(receive_messages(queue, limit))).any? - logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" } + sqs_msgs = Array(receive_messages(queue, limit)) + logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } if batch @manager.async.assign(queue.name, patch_sqs_msgs!(sqs_msgs)) @@ -28,47 +30,50 @@ def fetch(queue, available_processors) sqs_msgs.each { |sqs_msg| @manager.async.assign(queue.name, sqs_msg) } end - @manager.async.messages_present(queue) - else - logger.debug { "No message found for '#{queue}'" } + @polling_strategy.messages_found(queue, sqs_msgs.size) - @manager.async.queue_empty(queue) + logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } + rescue => ex + logger.error { "Error fetching message: #{ex}" } + logger.error { ex.backtrace.first } end - logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } - rescue => ex - logger.error { "Error fetching message: #{ex}" } - logger.error { ex.backtrace.first } + @manager.async.dispatch end + end - @manager.async.dispatch + def next_queue(*args) + @polling_strategy.next_queue(*args) end - end - private + def active_queues(*args) + @polling_strategy.active_queues(*args) + end - def receive_messages(queue, limit) - # AWS limits the batch size by 10 - limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit + private - options = (Shoryuken.options[:aws][:receive_message] || {}).dup - options[:max_number_of_messages] = limit - options[:message_attribute_names] = %w(All) - options[:attribute_names] = %w(All) + def receive_messages(queue, limit) + # AWS limits the batch size by 10 + limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options.merge!(queue.options) + options = (Shoryuken.options[:aws][:receive_message] || {}).dup + options[:max_number_of_messages] = limit + options[:message_attribute_names] = %w(All) + options[:attribute_names] = %w(All) - Shoryuken::Client.queues(queue.name).receive_messages options - end + options.merge!(queue.options) - def patch_sqs_msgs!(sqs_msgs) - sqs_msgs.instance_eval do - def message_id - "batch-with-#{size}-messages" - end + Shoryuken::Client.queues(queue.name).receive_messages options end - sqs_msgs + def patch_sqs_msgs!(sqs_msgs) + sqs_msgs.instance_eval do + def message_id + "batch-with-#{size}-messages" + end + end + + sqs_msgs + end end end -end diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 4d125a1d..6b692b35 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -10,7 +10,8 @@ class Launcher def initialize @condvar = Celluloid::Condition.new @manager = Shoryuken::Manager.new_link(@condvar) - @fetcher = Shoryuken::Fetcher.new_link(manager) + polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) + @fetcher = Shoryuken::Fetcher.new_link(manager, polling_strategy) @done = false diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index e8cc6a8d..de2ba55f 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -1,5 +1,4 @@ require 'shoryuken/processor' -require 'shoryuken/polling' require 'shoryuken/fetcher' module Shoryuken @@ -15,7 +14,6 @@ def initialize(condvar) @count = Shoryuken.options[:concurrency] || 25 raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 @queues = Shoryuken.queues.dup.uniq - @polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) @finished = condvar @done = false @@ -107,42 +105,31 @@ def assign(queue, sqs_msg) end end - def messages_present(queue) - watchdog('Manager#messages_present died') do - @polling_strategy.messages_present(queue) - end - end - - def queue_empty(queue) - return if delay <= 0 - - logger.debug { "Pausing '#{queue}' for #{delay} seconds, because it's empty" } - - @polling_strategy.pause(queue) - - after(delay) { async.restart_queue!(queue) } - end - - def dispatch return if stopped? - logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@polling_strategy.active_queues}" } + logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@fetcher.active_queues}" } if @ready.empty? logger.debug { 'Pausing fetcher, because all processors are busy' } - dispatch_later return end - if (queue = next_queue) - @fetcher.async.fetch(queue, @ready.size) - else + queue = @fetcher.next_queue + if queue == nil logger.debug { 'Pausing fetcher, because all queues are paused' } + after(1) { dispatch } + return + end - @fetcher_paused = true + unless defined?(::ActiveJob) || Shoryuken.worker_registry.workers(queue.name).any? + logger.debug { "Pausing fetcher, because of no registered workers for queue #{queue}" } + after(1) { dispatch } + return end + + @fetcher.async.fetch(queue, @ready.size) end def real_thread(proxy_id, thr) @@ -168,40 +155,6 @@ def build_processor processor end - def restart_queue!(queue) - return if stopped? - - @polling_strategy.restart(queue) - - if @fetcher_paused - logger.debug { 'Restarting fetcher' } - - @fetcher_paused = false - - dispatch - end - end - - def next_queue - # get/remove the first queue in the list - queue = @polling_strategy.next_queue - - return nil unless queue - - if queue && (!defined?(::ActiveJob) && Shoryuken.worker_registry.workers(queue.name).empty?) - # when no worker registered pause the queue to avoid endless recursion - logger.debug { "Pausing '#{queue}' for #{delay} seconds, because no workers registered" } - - @polling_strategy.pause(queue) - - after(delay) { async.restart_queue!(queue) } - - queue = next_queue - end - - queue - end - def soft_shutdown(delay) logger.info { "Waiting for #{@busy.size} busy workers" } diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 508156aa..a656ae57 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -27,34 +27,34 @@ class WeightedRoundRobin def initialize(queues) @initial_queues = queues @queues = queues.dup.uniq - end - - def active_queues - unparse_queues(@queues) + @paused_queues = [] end def next_queue + unpause_queues queue = @queues.shift + return nil if queue == nil + @queues << queue QueueConfiguration.new(queue, {}) end - def messages_present(queue) - return unless (original = original_queue_weight(queue)) > (current = current_queue_weight(queue)) - - logger.info "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" - @queues << queue - end + def messages_found(queue, messages_found) + if messages_found == 0 + pause(queue) + return + end - def pause(queue) - return unless @queues.delete(queue) - logger.debug "Paused '#{queue}'" + maximum_weight = maximum_queue_weight(queue) + current_weight = current_queue_weight(queue) + if maximum_weight > current_weight + logger.info { "Increasing '#{queue}' weight to #{current_weight + 1}, max: #{maximum_weight}" } + @queues << queue + end end - def restart(queue) - return if @queues.include?(queue) - logger.debug "Restarting '#{queue}'" - @queues << queue + def active_queues + unparse_queues(@queues) end def ==(other) @@ -72,11 +72,29 @@ def ==(other) private + def delay + Shoryuken.options[:delay].to_f + end + + def pause(queue) + return unless @queues.delete(queue) + @paused_queues << [Time.now + delay, queue] + logger.debug "Paused '#{queue}'" + end + + def unpause_queues + return if @paused_queues.empty? + return if Time.now < @paused_queues.first[0] + pause = @paused_queues.shift + @queues << pause[1] + logger.debug "Unpaused '#{pause[1]}'" + end + def current_queue_weight(queue) queue_weight(@queues, queue) end - def original_queue_weight(queue) + def maximum_queue_weight(queue) queue_weight(@initial_queues, queue) end diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index a5b43bdb..6b9be0c6 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -2,41 +2,47 @@ require 'shoryuken/manager' require 'shoryuken/fetcher' + describe Shoryuken::Fetcher do let(:manager) { double Shoryuken::Manager } let(:queue) { double Shoryuken::Queue } let(:queue_name) { 'default' } + let(:queues) { [queue_name] } let(:queue_config) { Shoryuken::Polling::QueueConfiguration.new(queue_name, {}) } + let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } let(:sqs_msg) do double Shoryuken::Message, queue_url: queue_name, body: 'test', - message_id: 'fc754df7-9cc2-4c41-96ca-5996a44b771e' + message_id: 'fc754df79cc24c4196ca5996a44b771e' end - subject { described_class.new(manager) } + subject { described_class.new(manager, polling_strategy) } before do allow(manager).to receive(:async).and_return(manager) allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) end - describe '#fetch' do it 'calls pause when no message' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']).and_return([]) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return([]) - expect(manager).to receive(:queue_empty).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 0) expect(manager).to receive(:dispatch) subject.fetch(queue_config, 1) end it 'assigns messages' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) @@ -46,9 +52,11 @@ it 'assigns messages in batch' do TestWorker.get_shoryuken_options['batch'] = true - allow(queue).to receive(:receive_messages).with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, [sqs_msg]) expect(manager).to receive(:dispatch) @@ -59,9 +67,11 @@ let(:queue_name) { 'notfound' } it 'ignores batch' do - allow(queue).to receive(:receive_messages).with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']).and_return(sqs_msg) + allow(queue).to receive(:receive_messages) + .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return(sqs_msg) - expect(manager).to receive(:messages_present).with(queue_config) + expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) expect(manager).to receive(:assign).with(queue_name, sqs_msg) expect(manager).to receive(:dispatch) diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 42d136b0..32b1c37a 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -2,10 +2,22 @@ require 'shoryuken/manager' RSpec.describe Shoryuken::Manager do - subject do + let(:queue1) { 'shoryuken' } + let(:queue2) { 'uppercut'} + let(:queues) { [] } + let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } + let(:fetcher) { Shoryuken::Fetcher.new(manager, polling_strategy) } + let(:condvar) do condvar = double(:condvar) allow(condvar).to receive(:signal).and_return(nil) - Shoryuken::Manager.new(condvar) + condvar + end + let(:manager) { Shoryuken::Manager.new(condvar) } + + subject { manager } + + before(:each) do + manager.fetcher = fetcher end describe 'Invalid concurrency setting' do @@ -14,109 +26,5 @@ expect { Shoryuken::Manager.new(nil) } .to raise_error(ArgumentError, 'Concurrency value -1 is invalid, it needs to be a positive number') end - - end - - describe 'Auto Scaling' do - it 'decreases weight' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 2] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] - - subject.queue_empty(queue1) - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - end - - it 'increases weight' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 3] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue1, queue2] - subject.queue_empty(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1] - - subject.messages_present(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1, queue1, queue1] - end - - it 'adds queue back' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - # [shoryuken, 2] - # [uppercut, 1] - Shoryuken.queues << queue1 - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - Shoryuken.options[:delay] = 0.1 - - fetcher = double('Fetcher').as_null_object - subject.fetcher = fetcher - - subject.queue_empty(queue1) - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2] - - sleep 0.5 - - expect(subject.instance_variable_get('@polling_strategy')).to eq [queue2, queue1] - end - end - - describe '#next_queue' do - it 'returns queues' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - - Shoryuken.register_worker queue1, TestWorker - Shoryuken.register_worker queue2, TestWorker - - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.send :next_queue).to eq queue1 - expect(subject.send :next_queue).to eq queue2 - end - - it 'skips when no worker' do - queue1 = 'shoryuken' - queue2 = 'uppercut' - - Shoryuken.queues.clear - - Shoryuken.register_worker queue2, TestWorker - - Shoryuken.queues << queue1 - Shoryuken.queues << queue2 - - expect(subject.send :next_queue).to eq queue2 - expect(subject.send :next_queue).to eq queue2 - end end end diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb index 47f33bb6..5a50b3bb 100644 --- a/spec/shoryuken/polling_spec.rb +++ b/spec/shoryuken/polling_spec.rb @@ -1,63 +1,100 @@ require 'spec_helper' require 'shoryuken/polling' -describe Shoryuken::Polling do +describe Shoryuken::Polling::WeightedRoundRobin do let(:queue1) { 'shoryuken' } let(:queue2) { 'uppercut' } let(:queues) { Array.new } + subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - describe Shoryuken::Polling::WeightedRoundRobin do - subject { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - - it 'decreases weight' do + describe '#next_queue' do + it 'cycles' do # [shoryuken, 2] # [uppercut, 1] queues << queue1 queues << queue1 queues << queue2 - expect(subject).to eq [queue1, queue2] - - subject.pause(queue1) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + end - expect(subject).to eq [queue2] + it 'returns nil if there are no active queues' do + expect(subject.next_queue).to eq(nil) end - it 'increases weight' do - # [shoryuken, 3] + it 'unpauses queues whose pause is expired' do + # [shoryuken, 2] # [uppercut, 1] queues << queue1 queues << queue1 - queues << queue1 queues << queue2 - expect(subject).to eq [queue1, queue2] - subject.pause(queue1) - expect(subject).to eq [queue2] + allow(subject).to receive(:delay).and_return(10) + + now = Time.now + allow(Time).to receive(:now).and_return(now) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1] + # pause the first queue + subject.messages_found(queue1, 0) + expect(subject.next_queue).to eq(queue2) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1, queue1] + now += 5 + allow(Time).to receive(:now).and_return(now) - subject.messages_present(queue1) - expect(subject).to eq [queue2, queue1, queue1, queue1] + # pause the second queue + subject.messages_found(queue2, 0) + expect(subject.next_queue).to eq(nil) + + # queue1 should be unpaused now + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + + # queue1 should be unpaused and added to the end of queues now + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) end + end - it 'cycles' do - # [shoryuken, 1] + describe '#messages_found' do + it 'pauses a queue if there are no messages found' do + # [shoryuken, 2] # [uppercut, 1] queues << queue1 + queues << queue1 queues << queue2 - popped = [] + expect(subject).to receive(:pause).with(queue1).and_call_original + subject.messages_found(queue1, 0) + expect(subject.instance_variable_get(:@queues)).to eq([queue2]) + end - (queues.size * 3).times do - popped << subject.next_queue - end + it 'increased the weight if message is found' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2]) + subject.messages_found(queue1, 1) + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2, queue1]) + end + + it 'respects the maximum queue weight' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 - expect(popped).to eq(queues * 3) + subject.messages_found(queue1, 1) + subject.messages_found(queue1, 1) + expect(subject.instance_variable_get(:@queues)).to eq([queue1, queue2, queue1]) end end end From 308dc5b7ba1573e9d6212deb0406e2e1512b422b Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sun, 31 Jul 2016 15:23:38 +0100 Subject: [PATCH 05/11] Make Fetcher superslim --- lib/shoryuken/fetcher.rb | 40 ++----------------- lib/shoryuken/launcher.rb | 6 +-- lib/shoryuken/manager.rb | 61 ++++++++++++++++++++-------- spec/shoryuken/fetcher_spec.rb | 72 +++++++--------------------------- spec/shoryuken/manager_spec.rb | 10 ++--- 5 files changed, 69 insertions(+), 120 deletions(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 5e7b2004..a4c7458e 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -1,16 +1,9 @@ module Shoryuken class Fetcher - include Celluloid include Util FETCH_LIMIT = 10 - def initialize(manager, polling_strategy) - @manager = manager - @polling_strategy = polling_strategy - @delay = Shoryuken.options[:delay].to_f - end - def fetch(queue, available_processors) watchdog('Fetcher#fetch died') do started_at = Time.now @@ -18,38 +11,21 @@ def fetch(queue, available_processors) logger.debug { "Looking for new messages in '#{queue}'" } begin - batch = Shoryuken.worker_registry.batch_receive_messages?(queue.name) - limit = batch ? FETCH_LIMIT : available_processors + limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors sqs_msgs = Array(receive_messages(queue, limit)) logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } - if batch - @manager.async.assign(queue.name, patch_sqs_msgs!(sqs_msgs)) - else - sqs_msgs.each { |sqs_msg| @manager.async.assign(queue.name, sqs_msg) } - end - - @polling_strategy.messages_found(queue, sqs_msgs.size) - logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } + + sqs_msgs rescue => ex logger.error { "Error fetching message: #{ex}" } logger.error { ex.backtrace.first } end - - @manager.async.dispatch end end - def next_queue(*args) - @polling_strategy.next_queue(*args) - end - - def active_queues(*args) - @polling_strategy.active_queues(*args) - end - private def receive_messages(queue, limit) @@ -65,15 +41,5 @@ def receive_messages(queue, limit) Shoryuken::Client.queues(queue.name).receive_messages options end - - def patch_sqs_msgs!(sqs_msgs) - sqs_msgs.instance_eval do - def message_id - "batch-with-#{size}-messages" - end - end - - sqs_msgs - end end end diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 6b692b35..c6e3506a 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -10,18 +10,16 @@ class Launcher def initialize @condvar = Celluloid::Condition.new @manager = Shoryuken::Manager.new_link(@condvar) - polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) - @fetcher = Shoryuken::Fetcher.new_link(manager, polling_strategy) @done = false - manager.fetcher = @fetcher + manager.fetcher = Shoryuken::Fetcher.new + manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues) end def stop(options = {}) watchdog('Launcher#stop') do @done = true - @fetcher.terminate if @fetcher.alive? manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]) @condvar.wait diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index de2ba55f..790068fd 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -7,9 +7,12 @@ class Manager include Util attr_accessor :fetcher + attr_accessor :polling_strategy trap_exit :processor_died + BATCH_LIMIT = 10 + def initialize(condvar) @count = Shoryuken.options[:concurrency] || 25 raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 @@ -40,8 +43,6 @@ def stop(options = {}) fire_event(:shutdown, true) - @fetcher.terminate if @fetcher.alive? - logger.info { "Shutting down #{@ready.size} quiet workers" } @ready.each do |processor| @@ -94,21 +95,10 @@ def stopped? @done end - def assign(queue, sqs_msg) - watchdog('Manager#assign died') do - logger.debug { "Assigning #{sqs_msg.message_id}" } - - processor = @ready.pop - @busy << processor - - processor.async.process(queue, sqs_msg) - end - end - def dispatch return if stopped? - logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{@fetcher.active_queues}" } + logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{polling_strategy.active_queues}" } if @ready.empty? logger.debug { 'Pausing fetcher, because all processors are busy' } @@ -116,7 +106,7 @@ def dispatch return end - queue = @fetcher.next_queue + queue = polling_strategy.next_queue if queue == nil logger.debug { 'Pausing fetcher, because all queues are paused' } after(1) { dispatch } @@ -129,7 +119,9 @@ def dispatch return end - @fetcher.async.fetch(queue, @ready.size) + batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) + + self.async.dispatch end def real_thread(proxy_id, thr) @@ -145,6 +137,33 @@ def dispatch_later end end + def assign(queue, sqs_msg) + watchdog('Manager#assign died') do + logger.debug { "Assigning #{sqs_msg.message_id}" } + + processor = @ready.pop + @busy << processor + + processor.async.process(queue, sqs_msg) + end + end + + def dispatch_batch(queue) + batch = fetcher.fetch(queue, BATCH_LIMIT) + self.async.assign(queue.name, patch_batch!(batch)) + polling_strategy.messages_found(queue.name, batch.size) + end + + def dispatch_single_messages(queue) + messages = fetcher.fetch(queue, @ready.size) + messages.each { |message| self.async.assign(queue.name, message) } + polling_strategy.messages_found(queue.name, messages.size) + end + + def batched_queue?(queue) + Shoryuken.worker_registry.batch_receive_messages?(queue.name) + end + def delay Shoryuken.options[:delay].to_f end @@ -185,5 +204,15 @@ def hard_shutdown_in(delay) end end end + + def patch_batch!(sqs_msgs) + sqs_msgs.instance_eval do + def message_id + "batch-with-#{size}-messages" + end + end + + sqs_msgs + end end end diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index 6b9be0c6..c947a701 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -4,79 +4,35 @@ describe Shoryuken::Fetcher do - let(:manager) { double Shoryuken::Manager } - let(:queue) { double Shoryuken::Queue } + let(:queue) { instance_double('Shoryuken::Queue') } let(:queue_name) { 'default' } - let(:queues) { [queue_name] } let(:queue_config) { Shoryuken::Polling::QueueConfiguration.new(queue_name, {}) } - let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } let(:sqs_msg) do - double Shoryuken::Message, + double(Shoryuken::Message, queue_url: queue_name, body: 'test', - message_id: 'fc754df79cc24c4196ca5996a44b771e' + message_id: 'fc754df79cc24c4196ca5996a44b771e', + ) end - subject { described_class.new(manager, polling_strategy) } - - before do - allow(manager).to receive(:async).and_return(manager) - allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) - end + subject { described_class.new } describe '#fetch' do - it 'calls pause when no message' do - allow(queue).to receive(:receive_messages) + it 'calls Shoryuken::Client to receive messages' do + expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) + expect(queue).to receive(:receive_messages) .with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']) .and_return([]) - - expect(polling_strategy).to receive(:messages_found).with(queue_config, 0) - expect(manager).to receive(:dispatch) - subject.fetch(queue_config, 1) end - it 'assigns messages' do - allow(queue).to receive(:receive_messages) - .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) - .and_return(sqs_msg) - - expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) - expect(manager).to receive(:assign).with(queue_name, sqs_msg) - expect(manager).to receive(:dispatch) - - subject.fetch(queue_config, 5) - end - - it 'assigns messages in batch' do - TestWorker.get_shoryuken_options['batch'] = true - - allow(queue).to receive(:receive_messages) - .with(max_number_of_messages: described_class::FETCH_LIMIT, attribute_names: ['All'], message_attribute_names: ['All']) - .and_return(sqs_msg) - - expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) - expect(manager).to receive(:assign).with(queue_name, [sqs_msg]) - expect(manager).to receive(:dispatch) - - subject.fetch(queue_config, 5) - end - - context 'when worker not found' do - let(:queue_name) { 'notfound' } - - it 'ignores batch' do - allow(queue).to receive(:receive_messages) - .with(max_number_of_messages: 5, attribute_names: ['All'], message_attribute_names: ['All']) - .and_return(sqs_msg) - - expect(polling_strategy).to receive(:messages_found).with(queue_config, 1) - expect(manager).to receive(:assign).with(queue_name, sqs_msg) - expect(manager).to receive(:dispatch) - - subject.fetch(queue_config, 5) - end + it 'maxes messages to receive to 10 (SQS limit)' do + allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) + expect(queue).to receive(:receive_messages) + .with(max_number_of_messages: 10, attribute_names: ['All'], message_attribute_names: ['All']) + .and_return([]) + subject.fetch(queue_config, 20) end end end diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 32b1c37a..393d0a87 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -4,20 +4,20 @@ RSpec.describe Shoryuken::Manager do let(:queue1) { 'shoryuken' } let(:queue2) { 'uppercut'} - let(:queues) { [] } + let(:queues) { [queue1, queue2] } let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - let(:fetcher) { Shoryuken::Fetcher.new(manager, polling_strategy) } + let(:fetcher) { Shoryuken::Fetcher.new } let(:condvar) do condvar = double(:condvar) allow(condvar).to receive(:signal).and_return(nil) condvar end - let(:manager) { Shoryuken::Manager.new(condvar) } - subject { manager } + subject { Shoryuken::Manager.new(condvar) } before(:each) do - manager.fetcher = fetcher + subject.fetcher = fetcher + subject.polling_strategy = polling_strategy end describe 'Invalid concurrency setting' do From 2cc523f06a1fc263e285065ddb111d986daa9d61 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sun, 31 Jul 2016 22:26:15 +0100 Subject: [PATCH 06/11] Replace unnecessary async calls with synchronous versions --- lib/shoryuken/manager.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 790068fd..3b81a416 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -150,13 +150,13 @@ def assign(queue, sqs_msg) def dispatch_batch(queue) batch = fetcher.fetch(queue, BATCH_LIMIT) - self.async.assign(queue.name, patch_batch!(batch)) + assign(queue.name, patch_batch!(batch)) polling_strategy.messages_found(queue.name, batch.size) end def dispatch_single_messages(queue) messages = fetcher.fetch(queue, @ready.size) - messages.each { |message| self.async.assign(queue.name, message) } + messages.each { |message| assign(queue.name, message) } polling_strategy.messages_found(queue.name, messages.size) end From 11988ecf8f0d125c042c42bd32071647c9de1496 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sat, 13 Aug 2016 20:48:16 +0100 Subject: [PATCH 07/11] Add dispatch tests for manager --- lib/shoryuken/fetcher.rb | 5 +-- lib/shoryuken/manager.rb | 14 ++----- spec/shoryuken/manager_spec.rb | 67 ++++++++++++++++++++++++++++++++-- 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index a4c7458e..54e4aff0 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -15,13 +15,12 @@ def fetch(queue, available_processors) sqs_msgs = Array(receive_messages(queue, limit)) logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } - logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } - sqs_msgs rescue => ex logger.error { "Error fetching message: #{ex}" } logger.error { ex.backtrace.first } + [] end end end @@ -39,7 +38,7 @@ def receive_messages(queue, limit) options.merge!(queue.options) - Shoryuken::Client.queues(queue.name).receive_messages options + Shoryuken::Client.queues(queue.name).receive_messages(options) end end end diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 3b81a416..8ab920b4 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -107,18 +107,12 @@ def dispatch end queue = polling_strategy.next_queue - if queue == nil + if queue == nil logger.debug { 'Pausing fetcher, because all queues are paused' } after(1) { dispatch } return end - unless defined?(::ActiveJob) || Shoryuken.worker_registry.workers(queue.name).any? - logger.debug { "Pausing fetcher, because of no registered workers for queue #{queue}" } - after(1) { dispatch } - return - end - batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) self.async.dispatch @@ -150,14 +144,14 @@ def assign(queue, sqs_msg) def dispatch_batch(queue) batch = fetcher.fetch(queue, BATCH_LIMIT) - assign(queue.name, patch_batch!(batch)) polling_strategy.messages_found(queue.name, batch.size) + assign(queue.name, patch_batch!(batch)) end def dispatch_single_messages(queue) messages = fetcher.fetch(queue, @ready.size) - messages.each { |message| assign(queue.name, message) } polling_strategy.messages_found(queue.name, messages.size) + messages.each { |message| assign(queue.name, message) } end def batched_queue?(queue) @@ -215,4 +209,4 @@ def message_id sqs_msgs end end -end +end \ No newline at end of file diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 393d0a87..a4ce408c 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -1,10 +1,15 @@ require 'spec_helper' require 'shoryuken/manager' +RSpec::Matchers.define :queue_config_of do |expected| + match do |actual| + actual.name == expected + end +end + RSpec.describe Shoryuken::Manager do - let(:queue1) { 'shoryuken' } - let(:queue2) { 'uppercut'} - let(:queues) { [queue1, queue2] } + let(:queue) { 'default' } + let(:queues) { [queue] } let(:polling_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } let(:fetcher) { Shoryuken::Fetcher.new } let(:condvar) do @@ -12,12 +17,21 @@ allow(condvar).to receive(:signal).and_return(nil) condvar end + let(:async_manager) { instance_double(described_class.name) } + let(:concurrency) { 1 } subject { Shoryuken::Manager.new(condvar) } before(:each) do + Shoryuken.options[:concurrency] = concurrency subject.fetcher = fetcher subject.polling_strategy = polling_strategy + allow_any_instance_of(described_class).to receive(:async).and_return(async_manager) + end + + after(:each) do + Shoryuken.options[:concurrency] = 1 + TestWorker.get_shoryuken_options['batch'] = false end describe 'Invalid concurrency setting' do @@ -27,4 +41,51 @@ .to raise_error(ArgumentError, 'Concurrency value -1 is invalid, it needs to be a positive number') end end + + describe '#dispatch' do + it 'pauses when there are no active queues' do + expect(polling_strategy).to receive(:next_queue).and_return(nil) + expect_any_instance_of(described_class).to receive(:after) + subject.dispatch + end + + it 'calls dispatch_batch if worker wants batches' do + TestWorker.get_shoryuken_options['batch'] = true + expect_any_instance_of(described_class).to receive(:dispatch_batch).with(queue_config_of(queue)) + expect_any_instance_of(described_class).to receive(:async).and_return(async_manager) + expect(async_manager).to receive(:dispatch) + subject.dispatch + end + + it 'calls dispatch_single_messages if worker wants single messages' do + expect_any_instance_of(described_class).to receive(:dispatch_single_messages) + .with(queue_config_of(queue)) + expect(async_manager).to receive(:dispatch) + subject.dispatch + end + end + + describe '#dispatch_batch' do + it 'assings batch as a single message' do + q = polling_strategy.next_queue + messages = [1, 2, 3] + expect(fetcher).to receive(:fetch).with(q, described_class::BATCH_LIMIT).and_return(messages) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, messages) + subject.send(:dispatch_batch, q) + end + end + + describe '#dispatch_single_messages' do + let(:concurrency) { 3 } + + it 'assings messages from batch one by one' do + q = polling_strategy.next_queue + messages = [1, 2, 3] + expect(fetcher).to receive(:fetch).with(q, concurrency).and_return(messages) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, 1) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, 2) + expect_any_instance_of(described_class).to receive(:assign).with(q.name, 3) + subject.send(:dispatch_single_messages, q) + end + end end From 20030396b24d3f0e7937ba368b019c80381653b6 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sat, 13 Aug 2016 20:58:23 +0100 Subject: [PATCH 08/11] Fix style errors --- lib/shoryuken/fetcher.rb | 70 +++++++++++++++++----------------- lib/shoryuken/manager.rb | 6 +-- lib/shoryuken/polling.rb | 4 +- spec/shoryuken/fetcher_spec.rb | 15 ++++---- spec/shoryuken/manager_spec.rb | 4 +- 5 files changed, 49 insertions(+), 50 deletions(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 54e4aff0..fc9a3fc3 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -1,44 +1,44 @@ - module Shoryuken - class Fetcher - include Util - - FETCH_LIMIT = 10 - - def fetch(queue, available_processors) - watchdog('Fetcher#fetch died') do - started_at = Time.now - - logger.debug { "Looking for new messages in '#{queue}'" } - - begin - limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors - - sqs_msgs = Array(receive_messages(queue, limit)) - logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } - logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } - sqs_msgs - rescue => ex - logger.error { "Error fetching message: #{ex}" } - logger.error { ex.backtrace.first } - [] - end +module Shoryuken + class Fetcher + include Util + + FETCH_LIMIT = 10 + + def fetch(queue, available_processors) + watchdog('Fetcher#fetch died') do + started_at = Time.now + + logger.debug { "Looking for new messages in '#{queue}'" } + + begin + limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors + + sqs_msgs = Array(receive_messages(queue, limit)) + logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } + logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } + sqs_msgs + rescue => ex + logger.error { "Error fetching message: #{ex}" } + logger.error { ex.backtrace.first } + [] end end + end - private + private - def receive_messages(queue, limit) - # AWS limits the batch size by 10 - limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit + def receive_messages(queue, limit) + # AWS limits the batch size by 10 + limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit - options = (Shoryuken.options[:aws][:receive_message] || {}).dup - options[:max_number_of_messages] = limit - options[:message_attribute_names] = %w(All) - options[:attribute_names] = %w(All) + options = (Shoryuken.options[:aws][:receive_message] || {}).dup + options[:max_number_of_messages] = limit + options[:message_attribute_names] = %w(All) + options[:attribute_names] = %w(All) - options.merge!(queue.options) + options.merge!(queue.options) - Shoryuken::Client.queues(queue.name).receive_messages(options) - end + Shoryuken::Client.queues(queue.name).receive_messages(options) end end +end diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 8ab920b4..2ecb4cfb 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -107,7 +107,7 @@ def dispatch end queue = polling_strategy.next_queue - if queue == nil + if queue.nil? logger.debug { 'Pausing fetcher, because all queues are paused' } after(1) { dispatch } return @@ -115,7 +115,7 @@ def dispatch batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) - self.async.dispatch + async.dispatch end def real_thread(proxy_id, thr) @@ -209,4 +209,4 @@ def message_id sqs_msgs end end -end \ No newline at end of file +end diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index a656ae57..f6d6d24e 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -33,7 +33,7 @@ def initialize(queues) def next_queue unpause_queues queue = @queues.shift - return nil if queue == nil + return nil if queue.nil? @queues << queue QueueConfiguration.new(queue, {}) @@ -81,7 +81,7 @@ def pause(queue) @paused_queues << [Time.now + delay, queue] logger.debug "Paused '#{queue}'" end - + def unpause_queues return if @paused_queues.empty? return if Time.now < @paused_queues.first[0] diff --git a/spec/shoryuken/fetcher_spec.rb b/spec/shoryuken/fetcher_spec.rb index c947a701..07ed1134 100644 --- a/spec/shoryuken/fetcher_spec.rb +++ b/spec/shoryuken/fetcher_spec.rb @@ -2,7 +2,6 @@ require 'shoryuken/manager' require 'shoryuken/fetcher' - describe Shoryuken::Fetcher do let(:queue) { instance_double('Shoryuken::Queue') } let(:queue_name) { 'default' } @@ -13,7 +12,7 @@ queue_url: queue_name, body: 'test', message_id: 'fc754df79cc24c4196ca5996a44b771e', - ) + ) end subject { described_class.new } @@ -21,17 +20,17 @@ describe '#fetch' do it 'calls Shoryuken::Client to receive messages' do expect(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) - expect(queue).to receive(:receive_messages) - .with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']) - .and_return([]) + expect(queue).to receive(:receive_messages). + with(max_number_of_messages: 1, attribute_names: ['All'], message_attribute_names: ['All']). + and_return([]) subject.fetch(queue_config, 1) end it 'maxes messages to receive to 10 (SQS limit)' do allow(Shoryuken::Client).to receive(:queues).with(queue_name).and_return(queue) - expect(queue).to receive(:receive_messages) - .with(max_number_of_messages: 10, attribute_names: ['All'], message_attribute_names: ['All']) - .and_return([]) + expect(queue).to receive(:receive_messages). + with(max_number_of_messages: 10, attribute_names: ['All'], message_attribute_names: ['All']). + and_return([]) subject.fetch(queue_config, 20) end end diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index a4ce408c..1b9973e0 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -58,8 +58,8 @@ end it 'calls dispatch_single_messages if worker wants single messages' do - expect_any_instance_of(described_class).to receive(:dispatch_single_messages) - .with(queue_config_of(queue)) + expect_any_instance_of(described_class).to receive(:dispatch_single_messages). + with(queue_config_of(queue)) expect(async_manager).to receive(:dispatch) subject.dispatch end From c582f8976d3bf43d5de020a0afde5a290f1f1bbb Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Sat, 13 Aug 2016 21:01:54 +0100 Subject: [PATCH 09/11] Call dispatch after processor is done/dead --- lib/shoryuken/manager.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 2ecb4cfb..bd1e4c7f 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -9,6 +9,8 @@ class Manager attr_accessor :fetcher attr_accessor :polling_strategy + exclusive :dispatch + trap_exit :processor_died BATCH_LIMIT = 10 @@ -72,6 +74,7 @@ def processor_done(queue, processor) return after(0) { @finished.signal } if @busy.empty? else @ready << processor + async.dispatch end end end @@ -87,6 +90,7 @@ def processor_died(processor, reason) return after(0) { @finished.signal } if @busy.empty? else @ready << build_processor + async.dispatch end end end From 37062ab501b851dac9899e52f269c4f3e3018744 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Tue, 13 Dec 2016 19:09:16 +0000 Subject: [PATCH 10/11] Use dispatch_later instead of after(1) { dispatch } --- lib/shoryuken/manager.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index bd1e4c7f..bb2b3181 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -113,7 +113,7 @@ def dispatch queue = polling_strategy.next_queue if queue.nil? logger.debug { 'Pausing fetcher, because all queues are paused' } - after(1) { dispatch } + dispatch_later return end From 7fc8393980aef3c0713a37d600b24b327b03cd25 Mon Sep 17 00:00:00 2001 From: Mario Kostelac Date: Tue, 13 Dec 2016 19:50:44 +0000 Subject: [PATCH 11/11] Fix log message --- lib/shoryuken/fetcher.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index fc9a3fc3..48b7d227 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -14,7 +14,7 @@ def fetch(queue, available_processors) limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors sqs_msgs = Array(receive_messages(queue, limit)) - logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" } + logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } sqs_msgs rescue => ex