diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 8ad9f5a0..c260acfa 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -57,8 +57,13 @@ def ready @max_processors - busy end - def processor_done + def processor_done(queue) @busy_processors.decrement + client_queue = Shoryuken::Client.queues(queue) + return unless client_queue.fifo? + return unless @polling_strategy.respond_to?(:message_processed) + + @polling_strategy.message_processed(queue) end def assign(queue_name, sqs_msg) @@ -68,9 +73,10 @@ def assign(queue_name, sqs_msg) @busy_processors.increment - Concurrent::Promise.execute( - executor: @executor - ) { Processor.process(queue_name, sqs_msg) }.then { processor_done }.rescue { processor_done } + Concurrent::Promise + .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) } + .then { processor_done(queue_name) } + .rescue { processor_done(queue_name) } end def dispatch_batch(queue) diff --git a/lib/shoryuken/polling/base.rb b/lib/shoryuken/polling/base.rb index dce5b511..14e1eb5c 100644 --- a/lib/shoryuken/polling/base.rb +++ b/lib/shoryuken/polling/base.rb @@ -40,6 +40,8 @@ def messages_found(_queue, _messages_found) fail NotImplementedError end + def message_processed(_queue); end + def active_queues fail NotImplementedError end diff --git a/lib/shoryuken/polling/strict_priority.rb b/lib/shoryuken/polling/strict_priority.rb index 85f3ffab..6303da5b 100644 --- a/lib/shoryuken/polling/strict_priority.rb +++ b/lib/shoryuken/polling/strict_priority.rb @@ -38,6 +38,11 @@ def active_queues .reverse end + def message_processed(queue) + logger.debug "Unpausing #{queue}" + @paused_until[queue] = Time.now + end + private def next_active_queue @@ -70,6 +75,7 @@ def queue_paused?(queue) def pause(queue) return unless delay > 0 + @paused_until[queue] = Time.now + delay logger.debug "Paused #{queue}" end diff --git a/lib/shoryuken/polling/weighted_round_robin.rb b/lib/shoryuken/polling/weighted_round_robin.rb index db40f96a..7b40955f 100644 --- a/lib/shoryuken/polling/weighted_round_robin.rb +++ b/lib/shoryuken/polling/weighted_round_robin.rb @@ -35,10 +35,20 @@ def active_queues unparse_queues(@queues) end + def message_processed(queue) + return if @paused_queues.empty? + + logger.debug "Unpausing #{queue}" + @paused_queues.reject! { |_time, name| name == queue } + @queues << queue + @queues.uniq! + end + private def pause(queue) return unless @queues.delete(queue) + @paused_queues << [Time.now + delay, queue] logger.debug "Paused #{queue}" end @@ -46,6 +56,7 @@ def pause(queue) def unpause_queues return if @paused_queues.empty? return if Time.now < @paused_queues.first[0] + pause = @paused_queues.shift @queues << pause[1] logger.debug "Unpaused #{pause[1]}" diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index a702b429..dca67e58 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -139,4 +139,28 @@ subject.send(:dispatch_single_messages, q) end end + + describe '#processor_done' do + let(:sqs_queue) { double Shoryuken::Queue } + + before do + allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) + end + + context 'when queue.fifo? is true' do + it 'calls message_processed on strategy' do + expect(sqs_queue).to receive(:fifo?).and_return(true) + expect(polling_strategy).to receive(:message_processed).with(queue) + subject.send(:processor_done, queue) + end + end + + context 'when queue.fifo? is false' do + it 'does not call message_processed on strategy' do + expect(sqs_queue).to receive(:fifo?).and_return(false) + expect(polling_strategy).to_not receive(:message_processed) + subject.send(:processor_done, queue) + end + end + end end diff --git a/spec/shoryuken/polling/strict_priority_spec.rb b/spec/shoryuken/polling/strict_priority_spec.rb index 1bcbf134..868ebc61 100644 --- a/spec/shoryuken/polling/strict_priority_spec.rb +++ b/spec/shoryuken/polling/strict_priority_spec.rb @@ -145,4 +145,14 @@ expect(subject.next_queue).to eq(queue3) end end + + describe '#message_processed' do + it 'removes paused queue, adds to active queues' do + strategy = Shoryuken::Polling::StrictPriority.new([queue1, queue2]) + strategy.send(:pause, queue1) + expect(strategy.active_queues).to eq([[queue2, 1]]) + strategy.message_processed(queue1) + expect(strategy.active_queues).to eq([[queue1, 2], [queue2, 1]]) + end + end end diff --git a/spec/shoryuken/polling/weighted_round_robin_spec.rb b/spec/shoryuken/polling/weighted_round_robin_spec.rb index 98416498..ab7e146d 100644 --- a/spec/shoryuken/polling/weighted_round_robin_spec.rb +++ b/spec/shoryuken/polling/weighted_round_robin_spec.rb @@ -104,4 +104,14 @@ expect(subject.delay).to eq(1.0) end end + + describe '#message_processed' do + it 'removes paused queue, adds to active queues' do + strategy = Shoryuken::Polling::WeightedRoundRobin.new([queue1, queue2]) + strategy.send(:pause, queue1) + expect(strategy.active_queues).to eq([[queue2, 1]]) + strategy.message_processed(queue1) + expect(strategy.active_queues).to eq([[queue2, 1], [queue1, 1]]) + end + end end