From 84ce97bf78e71626183759236bfcd45fcfa44a69 Mon Sep 17 00:00:00 2001 From: David Richey Date: Mon, 18 Jan 2021 11:35:01 -0500 Subject: [PATCH 1/4] Unpause FIFO queues on worker completion * Add in UnpauseQueue middleware * Added specs * Added unpause method on strategies * TODO: Add specs around #unpause * Updates Processor, Manager & Options to allow new variables * TODO: Add specs --- lib/shoryuken.rb | 1 + lib/shoryuken/manager.rb | 2 +- .../middleware/server/auto_delete.rb | 2 +- .../server/auto_extend_visibility.rb | 6 +- .../server/exponential_backoff_retry.rb | 8 +-- lib/shoryuken/middleware/server/timing.rb | 2 +- .../middleware/server/unpause_queue.rb | 15 +++++ lib/shoryuken/options.rb | 1 + lib/shoryuken/polling/strict_priority.rb | 5 ++ lib/shoryuken/polling/weighted_round_robin.rb | 11 ++++ lib/shoryuken/processor.rb | 17 +++--- .../middleware/server/unpause_queue_spec.rb | 58 +++++++++++++++++++ 12 files changed, 110 insertions(+), 18 deletions(-) create mode 100644 lib/shoryuken/middleware/server/unpause_queue.rb create mode 100644 spec/shoryuken/middleware/server/unpause_queue_spec.rb diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index f4e772c9..977c71d0 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -28,6 +28,7 @@ Shoryuken::Middleware::Server.autoload :AutoExtendVisibility, 'shoryuken/middleware/server/auto_extend_visibility' require 'shoryuken/middleware/server/exponential_backoff_retry' require 'shoryuken/middleware/server/timing' +require 'shoryuken/middleware/server/unpause_queue' require 'shoryuken/polling/base' require 'shoryuken/polling/weighted_round_robin' require 'shoryuken/polling/strict_priority' diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 8ad9f5a0..454997ed 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -70,7 +70,7 @@ def assign(queue_name, sqs_msg) Concurrent::Promise.execute( executor: @executor - ) { Processor.process(queue_name, sqs_msg) }.then { processor_done }.rescue { processor_done } + ) { Processor.process(queue_name, sqs_msg, @polling_strategy) }.then { processor_done }.rescue { processor_done } end def dispatch_batch(queue) diff --git a/lib/shoryuken/middleware/server/auto_delete.rb b/lib/shoryuken/middleware/server/auto_delete.rb index 2f47bf4e..6ea602be 100644 --- a/lib/shoryuken/middleware/server/auto_delete.rb +++ b/lib/shoryuken/middleware/server/auto_delete.rb @@ -2,7 +2,7 @@ module Shoryuken module Middleware module Server class AutoDelete - def call(worker, queue, sqs_msg, _body) + def call(worker, queue, sqs_msg, _body, _strategy) yield return unless worker.class.auto_delete? diff --git a/lib/shoryuken/middleware/server/auto_extend_visibility.rb b/lib/shoryuken/middleware/server/auto_extend_visibility.rb index aa3350bb..f402ec36 100644 --- a/lib/shoryuken/middleware/server/auto_extend_visibility.rb +++ b/lib/shoryuken/middleware/server/auto_extend_visibility.rb @@ -6,7 +6,7 @@ class AutoExtendVisibility EXTEND_UPFRONT_SECONDS = 5 - def call(worker, queue, sqs_msg, body) + def call(worker, queue, sqs_msg, body, _strategy = nil) return yield unless worker.class.auto_visibility_timeout? if sqs_msg.is_a?(Array) @@ -35,9 +35,9 @@ def auto_extend(_worker, queue, sqs_msg, _body) end sqs_msg.change_visibility(visibility_timeout: queue_visibility_timeout) - rescue => ex + rescue => e logger.error do - "Could not auto extend the message #{queue}/#{sqs_msg.message_id} visibility timeout. Error: #{ex.message}" + "Could not auto extend the message #{queue}/#{sqs_msg.message_id} visibility timeout. Error: #{e.message}" end end end diff --git a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb index 1ac6ac5a..1314396d 100755 --- a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb +++ b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb @@ -4,7 +4,7 @@ module Server class ExponentialBackoffRetry include Util - def call(worker, _queue, sqs_msg, _body) + def call(worker, _queue, sqs_msg, _body, _strategy = nil) return yield unless worker.class.exponential_backoff? if sqs_msg.is_a?(Array) @@ -14,7 +14,7 @@ def call(worker, _queue, sqs_msg, _body) started_at = Time.now yield - rescue => ex + rescue => e retry_intervals = worker.class.get_shoryuken_options['retry_intervals'] if retry_intervals.nil? || !handle_failure(sqs_msg, started_at, retry_intervals) @@ -23,9 +23,9 @@ def call(worker, _queue, sqs_msg, _body) raise end - logger.warn { "Message #{sqs_msg.message_id} will attempt retry due to error: #{ex.message}" } + logger.warn { "Message #{sqs_msg.message_id} will attempt retry due to error: #{e.message}" } # since we didn't raise, lets log the backtrace for debugging purposes. - logger.debug { ex.backtrace.join("\n") } unless ex.backtrace.nil? + logger.debug { e.backtrace.join("\n") } unless e.backtrace.nil? end private diff --git a/lib/shoryuken/middleware/server/timing.rb b/lib/shoryuken/middleware/server/timing.rb index 69dadcfa..df963ead 100644 --- a/lib/shoryuken/middleware/server/timing.rb +++ b/lib/shoryuken/middleware/server/timing.rb @@ -4,7 +4,7 @@ module Server class Timing include Util - def call(_worker, queue, _sqs_msg, _body) + def call(_worker, queue, _sqs_msg, _body, _strategy = nil) started_at = Time.now logger.info { "started at #{started_at}" } diff --git a/lib/shoryuken/middleware/server/unpause_queue.rb b/lib/shoryuken/middleware/server/unpause_queue.rb new file mode 100644 index 00000000..5a63a14a --- /dev/null +++ b/lib/shoryuken/middleware/server/unpause_queue.rb @@ -0,0 +1,15 @@ +module Shoryuken + module Middleware + module Server + class UnpauseQueue + def call(_worker, queue, _sqs_msg, _body, strategy) + yield + client_queue = Shoryuken::Client.queues(queue) + return unless client_queue.fifo? + + strategy.unpause_queue(queue) + end + end + end + end +end diff --git a/lib/shoryuken/options.rb b/lib/shoryuken/options.rb index b304fd2f..402ce84a 100644 --- a/lib/shoryuken/options.rb +++ b/lib/shoryuken/options.rb @@ -167,6 +167,7 @@ def default_server_middleware m.add Middleware::Server::ExponentialBackoffRetry m.add Middleware::Server::AutoDelete m.add Middleware::Server::AutoExtendVisibility + m.add Middleware::Server::UnpauseQueue if defined?(::ActiveRecord::Base) require 'shoryuken/middleware/server/active_record' m.add Middleware::Server::ActiveRecord diff --git a/lib/shoryuken/polling/strict_priority.rb b/lib/shoryuken/polling/strict_priority.rb index 85f3ffab..8fb925e2 100644 --- a/lib/shoryuken/polling/strict_priority.rb +++ b/lib/shoryuken/polling/strict_priority.rb @@ -38,6 +38,10 @@ def active_queues .reverse end + def unpause_queue(queue) + @paused_until[queue] = Time.now + end + private def next_active_queue @@ -70,6 +74,7 @@ def queue_paused?(queue) def pause(queue) return unless delay > 0 + @paused_until[queue] = Time.now + delay logger.debug "Paused #{queue}" end diff --git a/lib/shoryuken/polling/weighted_round_robin.rb b/lib/shoryuken/polling/weighted_round_robin.rb index db40f96a..764a015d 100644 --- a/lib/shoryuken/polling/weighted_round_robin.rb +++ b/lib/shoryuken/polling/weighted_round_robin.rb @@ -35,10 +35,20 @@ def active_queues unparse_queues(@queues) end + def unpause_queue(queue) + return if @paused_queues.empty? + + logger.debug "Unapusing #{queue}" + @paused_queues.reject! { |_time, name| name == queue } + @queues << queue + @queues.uniq! + end + private def pause(queue) return unless @queues.delete(queue) + @paused_queues << [Time.now + delay, queue] logger.debug "Paused #{queue}" end @@ -46,6 +56,7 @@ def pause(queue) def unpause_queues return if @paused_queues.empty? return if Time.now < @paused_queues.first[0] + pause = @paused_queues.shift @queues << pause[1] logger.debug "Unpaused #{pause[1]}" diff --git a/lib/shoryuken/processor.rb b/lib/shoryuken/processor.rb index 05208e4e..39701a33 100644 --- a/lib/shoryuken/processor.rb +++ b/lib/shoryuken/processor.rb @@ -2,28 +2,29 @@ module Shoryuken class Processor include Util - attr_reader :queue, :sqs_msg + attr_reader :queue, :sqs_msg, :polling_strategy - def self.process(queue, sqs_msg) - new(queue, sqs_msg).process + def self.process(queue, sqs_msg, polling_strategy) + new(queue, sqs_msg, polling_strategy).process end - def initialize(queue, sqs_msg) + def initialize(queue, sqs_msg, polling_strategy) @queue = queue @sqs_msg = sqs_msg + @polling_strategy = polling_strategy end def process return logger.error { "No worker found for #{queue}" } unless worker Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id}") do - worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do + worker.class.server_middleware.invoke(worker, queue, sqs_msg, body, polling_strategy) do worker.perform(sqs_msg, body) end end - rescue Exception => ex - logger.error { "Processor failed: #{ex.message}" } - logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? + rescue Exception => e + logger.error { "Processor failed: #{e.message}" } + logger.error { e.backtrace.join("\n") } unless e.backtrace.nil? raise end diff --git a/spec/shoryuken/middleware/server/unpause_queue_spec.rb b/spec/shoryuken/middleware/server/unpause_queue_spec.rb new file mode 100644 index 00000000..f896a852 --- /dev/null +++ b/spec/shoryuken/middleware/server/unpause_queue_spec.rb @@ -0,0 +1,58 @@ +require 'spec_helper' + +describe Shoryuken::Middleware::Server::UnpauseQueue do + let(:queue) { 'default' } + let(:queues) { [queue] } + let(:weighted_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } + let(:strict_strategy) { Shoryuken::Polling::StrictPriority.new(queues) } + let(:sqs_queue) { double Shoryuken::Queue } + + def build_message + double Shoryuken::Message, + queue_url: queue, + body: 'test', + receipt_handle: SecureRandom.uuid + end + + let(:sqs_msg) { build_message } + + before do + allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) + end + + context 'when strict strategy' do + it 'unpauses fifo queue' do + expect(sqs_queue).to receive(:fifo?).and_return(true) + strict_strategy.send(:pause, queue) + expect(strict_strategy.active_queues).to eq([]) + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, strict_strategy) {} + expect(strict_strategy.active_queues).to eq([[queue, 1]]) + end + + it 'will not unpause non fifo queue' do + expect(sqs_queue).to receive(:fifo?).and_return(false) + strict_strategy.send(:pause, queue) + expect(strict_strategy.active_queues).to eq([]) + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, strict_strategy) {} + expect(strict_strategy.active_queues).to eq([]) + end + end + + context 'when weighted strategy' do + it 'unpauses fifo queue' do + expect(sqs_queue).to receive(:fifo?).and_return(true) + weighted_strategy.send(:pause, queue) + expect(weighted_strategy.active_queues).to eq([]) + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, weighted_strategy) {} + expect(weighted_strategy.active_queues).to eq([[queue, 1]]) + end + + it 'will not unpause non fifo queue' do + expect(sqs_queue).to receive(:fifo?).and_return(false) + weighted_strategy.send(:pause, queue) + expect(weighted_strategy.active_queues).to eq([]) + subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, weighted_strategy) {} + expect(weighted_strategy.active_queues).to eq([]) + end + end +end From 3d12e163613074a096cf8a69755b4564069d4038 Mon Sep 17 00:00:00 2001 From: David Richey Date: Tue, 19 Jan 2021 21:05:30 -0500 Subject: [PATCH 2/4] Move unpause logic into Manager#processor_done --- lib/shoryuken.rb | 1 - lib/shoryuken/manager.rb | 13 +++-- .../middleware/server/auto_delete.rb | 2 +- .../server/auto_extend_visibility.rb | 6 +- .../server/exponential_backoff_retry.rb | 8 +-- lib/shoryuken/middleware/server/timing.rb | 2 +- .../middleware/server/unpause_queue.rb | 15 ----- lib/shoryuken/options.rb | 1 - lib/shoryuken/processor.rb | 17 +++--- spec/shoryuken/manager_spec.rb | 24 ++++++++ .../middleware/server/unpause_queue_spec.rb | 58 ------------------- .../shoryuken/polling/strict_priority_spec.rb | 10 ++++ .../polling/weighted_round_robin_spec.rb | 10 ++++ 13 files changed, 70 insertions(+), 97 deletions(-) delete mode 100644 lib/shoryuken/middleware/server/unpause_queue.rb delete mode 100644 spec/shoryuken/middleware/server/unpause_queue_spec.rb diff --git a/lib/shoryuken.rb b/lib/shoryuken.rb index 977c71d0..f4e772c9 100644 --- a/lib/shoryuken.rb +++ b/lib/shoryuken.rb @@ -28,7 +28,6 @@ Shoryuken::Middleware::Server.autoload :AutoExtendVisibility, 'shoryuken/middleware/server/auto_extend_visibility' require 'shoryuken/middleware/server/exponential_backoff_retry' require 'shoryuken/middleware/server/timing' -require 'shoryuken/middleware/server/unpause_queue' require 'shoryuken/polling/base' require 'shoryuken/polling/weighted_round_robin' require 'shoryuken/polling/strict_priority' diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 454997ed..5f7b343b 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -57,8 +57,12 @@ def ready @max_processors - busy end - def processor_done + def processor_done(queue) @busy_processors.decrement + client_queue = Shoryuken::Client.queues(queue) + return unless client_queue.fifo? + + @polling_strategy.unpause_queue(queue) end def assign(queue_name, sqs_msg) @@ -68,9 +72,10 @@ def assign(queue_name, sqs_msg) @busy_processors.increment - Concurrent::Promise.execute( - executor: @executor - ) { Processor.process(queue_name, sqs_msg, @polling_strategy) }.then { processor_done }.rescue { processor_done } + Concurrent::Promise + .execute(executor: @executor) { Processor.process(queue_name, sqs_msg) } + .then { processor_done(queue_name) } + .rescue { processor_done(queue_name) } end def dispatch_batch(queue) diff --git a/lib/shoryuken/middleware/server/auto_delete.rb b/lib/shoryuken/middleware/server/auto_delete.rb index 6ea602be..2f47bf4e 100644 --- a/lib/shoryuken/middleware/server/auto_delete.rb +++ b/lib/shoryuken/middleware/server/auto_delete.rb @@ -2,7 +2,7 @@ module Shoryuken module Middleware module Server class AutoDelete - def call(worker, queue, sqs_msg, _body, _strategy) + def call(worker, queue, sqs_msg, _body) yield return unless worker.class.auto_delete? diff --git a/lib/shoryuken/middleware/server/auto_extend_visibility.rb b/lib/shoryuken/middleware/server/auto_extend_visibility.rb index f402ec36..aa3350bb 100644 --- a/lib/shoryuken/middleware/server/auto_extend_visibility.rb +++ b/lib/shoryuken/middleware/server/auto_extend_visibility.rb @@ -6,7 +6,7 @@ class AutoExtendVisibility EXTEND_UPFRONT_SECONDS = 5 - def call(worker, queue, sqs_msg, body, _strategy = nil) + def call(worker, queue, sqs_msg, body) return yield unless worker.class.auto_visibility_timeout? if sqs_msg.is_a?(Array) @@ -35,9 +35,9 @@ def auto_extend(_worker, queue, sqs_msg, _body) end sqs_msg.change_visibility(visibility_timeout: queue_visibility_timeout) - rescue => e + rescue => ex logger.error do - "Could not auto extend the message #{queue}/#{sqs_msg.message_id} visibility timeout. Error: #{e.message}" + "Could not auto extend the message #{queue}/#{sqs_msg.message_id} visibility timeout. Error: #{ex.message}" end end end diff --git a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb index 1314396d..1ac6ac5a 100755 --- a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb +++ b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb @@ -4,7 +4,7 @@ module Server class ExponentialBackoffRetry include Util - def call(worker, _queue, sqs_msg, _body, _strategy = nil) + def call(worker, _queue, sqs_msg, _body) return yield unless worker.class.exponential_backoff? if sqs_msg.is_a?(Array) @@ -14,7 +14,7 @@ def call(worker, _queue, sqs_msg, _body, _strategy = nil) started_at = Time.now yield - rescue => e + rescue => ex retry_intervals = worker.class.get_shoryuken_options['retry_intervals'] if retry_intervals.nil? || !handle_failure(sqs_msg, started_at, retry_intervals) @@ -23,9 +23,9 @@ def call(worker, _queue, sqs_msg, _body, _strategy = nil) raise end - logger.warn { "Message #{sqs_msg.message_id} will attempt retry due to error: #{e.message}" } + logger.warn { "Message #{sqs_msg.message_id} will attempt retry due to error: #{ex.message}" } # since we didn't raise, lets log the backtrace for debugging purposes. - logger.debug { e.backtrace.join("\n") } unless e.backtrace.nil? + logger.debug { ex.backtrace.join("\n") } unless ex.backtrace.nil? end private diff --git a/lib/shoryuken/middleware/server/timing.rb b/lib/shoryuken/middleware/server/timing.rb index df963ead..69dadcfa 100644 --- a/lib/shoryuken/middleware/server/timing.rb +++ b/lib/shoryuken/middleware/server/timing.rb @@ -4,7 +4,7 @@ module Server class Timing include Util - def call(_worker, queue, _sqs_msg, _body, _strategy = nil) + def call(_worker, queue, _sqs_msg, _body) started_at = Time.now logger.info { "started at #{started_at}" } diff --git a/lib/shoryuken/middleware/server/unpause_queue.rb b/lib/shoryuken/middleware/server/unpause_queue.rb deleted file mode 100644 index 5a63a14a..00000000 --- a/lib/shoryuken/middleware/server/unpause_queue.rb +++ /dev/null @@ -1,15 +0,0 @@ -module Shoryuken - module Middleware - module Server - class UnpauseQueue - def call(_worker, queue, _sqs_msg, _body, strategy) - yield - client_queue = Shoryuken::Client.queues(queue) - return unless client_queue.fifo? - - strategy.unpause_queue(queue) - end - end - end - end -end diff --git a/lib/shoryuken/options.rb b/lib/shoryuken/options.rb index 402ce84a..b304fd2f 100644 --- a/lib/shoryuken/options.rb +++ b/lib/shoryuken/options.rb @@ -167,7 +167,6 @@ def default_server_middleware m.add Middleware::Server::ExponentialBackoffRetry m.add Middleware::Server::AutoDelete m.add Middleware::Server::AutoExtendVisibility - m.add Middleware::Server::UnpauseQueue if defined?(::ActiveRecord::Base) require 'shoryuken/middleware/server/active_record' m.add Middleware::Server::ActiveRecord diff --git a/lib/shoryuken/processor.rb b/lib/shoryuken/processor.rb index 39701a33..05208e4e 100644 --- a/lib/shoryuken/processor.rb +++ b/lib/shoryuken/processor.rb @@ -2,29 +2,28 @@ module Shoryuken class Processor include Util - attr_reader :queue, :sqs_msg, :polling_strategy + attr_reader :queue, :sqs_msg - def self.process(queue, sqs_msg, polling_strategy) - new(queue, sqs_msg, polling_strategy).process + def self.process(queue, sqs_msg) + new(queue, sqs_msg).process end - def initialize(queue, sqs_msg, polling_strategy) + def initialize(queue, sqs_msg) @queue = queue @sqs_msg = sqs_msg - @polling_strategy = polling_strategy end def process return logger.error { "No worker found for #{queue}" } unless worker Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id}") do - worker.class.server_middleware.invoke(worker, queue, sqs_msg, body, polling_strategy) do + worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do worker.perform(sqs_msg, body) end end - rescue Exception => e - logger.error { "Processor failed: #{e.message}" } - logger.error { e.backtrace.join("\n") } unless e.backtrace.nil? + rescue Exception => ex + logger.error { "Processor failed: #{ex.message}" } + logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? raise end diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index a702b429..355dd0c4 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -139,4 +139,28 @@ subject.send(:dispatch_single_messages, q) end end + + describe '#processor_done' do + let(:sqs_queue) { double Shoryuken::Queue } + + before do + allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) + end + + context 'when queue.fifo? is true' do + it 'calls unpause_queue on strategy' do + expect(sqs_queue).to receive(:fifo?).and_return(true) + expect(polling_strategy).to receive(:unpause_queue).with(queue) + subject.send(:processor_done, queue) + end + end + + context 'when queue.fifo? is false' do + it 'does not call unpause_queue on strategy' do + expect(sqs_queue).to receive(:fifo?).and_return(false) + expect(polling_strategy).to_not receive(:unpause_queue) + subject.send(:processor_done, queue) + end + end + end end diff --git a/spec/shoryuken/middleware/server/unpause_queue_spec.rb b/spec/shoryuken/middleware/server/unpause_queue_spec.rb deleted file mode 100644 index f896a852..00000000 --- a/spec/shoryuken/middleware/server/unpause_queue_spec.rb +++ /dev/null @@ -1,58 +0,0 @@ -require 'spec_helper' - -describe Shoryuken::Middleware::Server::UnpauseQueue do - let(:queue) { 'default' } - let(:queues) { [queue] } - let(:weighted_strategy) { Shoryuken::Polling::WeightedRoundRobin.new(queues) } - let(:strict_strategy) { Shoryuken::Polling::StrictPriority.new(queues) } - let(:sqs_queue) { double Shoryuken::Queue } - - def build_message - double Shoryuken::Message, - queue_url: queue, - body: 'test', - receipt_handle: SecureRandom.uuid - end - - let(:sqs_msg) { build_message } - - before do - allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) - end - - context 'when strict strategy' do - it 'unpauses fifo queue' do - expect(sqs_queue).to receive(:fifo?).and_return(true) - strict_strategy.send(:pause, queue) - expect(strict_strategy.active_queues).to eq([]) - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, strict_strategy) {} - expect(strict_strategy.active_queues).to eq([[queue, 1]]) - end - - it 'will not unpause non fifo queue' do - expect(sqs_queue).to receive(:fifo?).and_return(false) - strict_strategy.send(:pause, queue) - expect(strict_strategy.active_queues).to eq([]) - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, strict_strategy) {} - expect(strict_strategy.active_queues).to eq([]) - end - end - - context 'when weighted strategy' do - it 'unpauses fifo queue' do - expect(sqs_queue).to receive(:fifo?).and_return(true) - weighted_strategy.send(:pause, queue) - expect(weighted_strategy.active_queues).to eq([]) - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, weighted_strategy) {} - expect(weighted_strategy.active_queues).to eq([[queue, 1]]) - end - - it 'will not unpause non fifo queue' do - expect(sqs_queue).to receive(:fifo?).and_return(false) - weighted_strategy.send(:pause, queue) - expect(weighted_strategy.active_queues).to eq([]) - subject.call(TestWorker.new, queue, sqs_msg, sqs_msg.body, weighted_strategy) {} - expect(weighted_strategy.active_queues).to eq([]) - end - end -end diff --git a/spec/shoryuken/polling/strict_priority_spec.rb b/spec/shoryuken/polling/strict_priority_spec.rb index 1bcbf134..64f021dc 100644 --- a/spec/shoryuken/polling/strict_priority_spec.rb +++ b/spec/shoryuken/polling/strict_priority_spec.rb @@ -145,4 +145,14 @@ expect(subject.next_queue).to eq(queue3) end end + + describe '#unpause_queue' do + it 'removes paused queue, adds to active queues' do + strategy = Shoryuken::Polling::StrictPriority.new([queue1, queue2]) + strategy.send(:pause, queue1) + expect(strategy.active_queues).to eq([[queue2, 1]]) + strategy.unpause_queue(queue1) + expect(strategy.active_queues).to eq([[queue1, 2], [queue2, 1]]) + end + end end diff --git a/spec/shoryuken/polling/weighted_round_robin_spec.rb b/spec/shoryuken/polling/weighted_round_robin_spec.rb index 98416498..183a8fe0 100644 --- a/spec/shoryuken/polling/weighted_round_robin_spec.rb +++ b/spec/shoryuken/polling/weighted_round_robin_spec.rb @@ -104,4 +104,14 @@ expect(subject.delay).to eq(1.0) end end + + describe '#unpause_queue' do + it 'removes paused queue, adds to active queues' do + strategy = Shoryuken::Polling::WeightedRoundRobin.new([queue1, queue2]) + strategy.send(:pause, queue1) + expect(strategy.active_queues).to eq([[queue2, 1]]) + strategy.unpause_queue(queue1) + expect(strategy.active_queues).to eq([[queue2, 1], [queue1, 1]]) + end + end end From c54cc4ee1f1c71a1fa71b263dfe4e620baa523f3 Mon Sep 17 00:00:00 2001 From: David Richey Date: Thu, 21 Jan 2021 15:47:57 -0500 Subject: [PATCH 3/4] Unpause logging & fixes --- lib/shoryuken/polling/strict_priority.rb | 1 + lib/shoryuken/polling/weighted_round_robin.rb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/shoryuken/polling/strict_priority.rb b/lib/shoryuken/polling/strict_priority.rb index 8fb925e2..930db604 100644 --- a/lib/shoryuken/polling/strict_priority.rb +++ b/lib/shoryuken/polling/strict_priority.rb @@ -39,6 +39,7 @@ def active_queues end def unpause_queue(queue) + logger.debug "Unpausing #{queue}" @paused_until[queue] = Time.now end diff --git a/lib/shoryuken/polling/weighted_round_robin.rb b/lib/shoryuken/polling/weighted_round_robin.rb index 764a015d..39027aba 100644 --- a/lib/shoryuken/polling/weighted_round_robin.rb +++ b/lib/shoryuken/polling/weighted_round_robin.rb @@ -38,7 +38,7 @@ def active_queues def unpause_queue(queue) return if @paused_queues.empty? - logger.debug "Unapusing #{queue}" + logger.debug "Unpausing #{queue}" @paused_queues.reject! { |_time, name| name == queue } @queues << queue @queues.uniq! From f2990d96030d84dc9e2f88ec585921123ef5c65d Mon Sep 17 00:00:00 2001 From: David Richey Date: Sat, 23 Jan 2021 07:48:22 -0500 Subject: [PATCH 4/4] Update method naming -> message_processed --- lib/shoryuken/manager.rb | 3 ++- lib/shoryuken/polling/base.rb | 2 ++ lib/shoryuken/polling/strict_priority.rb | 2 +- lib/shoryuken/polling/weighted_round_robin.rb | 2 +- spec/shoryuken/manager_spec.rb | 8 ++++---- spec/shoryuken/polling/strict_priority_spec.rb | 4 ++-- spec/shoryuken/polling/weighted_round_robin_spec.rb | 4 ++-- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 5f7b343b..c260acfa 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -61,8 +61,9 @@ def processor_done(queue) @busy_processors.decrement client_queue = Shoryuken::Client.queues(queue) return unless client_queue.fifo? + return unless @polling_strategy.respond_to?(:message_processed) - @polling_strategy.unpause_queue(queue) + @polling_strategy.message_processed(queue) end def assign(queue_name, sqs_msg) diff --git a/lib/shoryuken/polling/base.rb b/lib/shoryuken/polling/base.rb index dce5b511..14e1eb5c 100644 --- a/lib/shoryuken/polling/base.rb +++ b/lib/shoryuken/polling/base.rb @@ -40,6 +40,8 @@ def messages_found(_queue, _messages_found) fail NotImplementedError end + def message_processed(_queue); end + def active_queues fail NotImplementedError end diff --git a/lib/shoryuken/polling/strict_priority.rb b/lib/shoryuken/polling/strict_priority.rb index 930db604..6303da5b 100644 --- a/lib/shoryuken/polling/strict_priority.rb +++ b/lib/shoryuken/polling/strict_priority.rb @@ -38,7 +38,7 @@ def active_queues .reverse end - def unpause_queue(queue) + def message_processed(queue) logger.debug "Unpausing #{queue}" @paused_until[queue] = Time.now end diff --git a/lib/shoryuken/polling/weighted_round_robin.rb b/lib/shoryuken/polling/weighted_round_robin.rb index 39027aba..7b40955f 100644 --- a/lib/shoryuken/polling/weighted_round_robin.rb +++ b/lib/shoryuken/polling/weighted_round_robin.rb @@ -35,7 +35,7 @@ def active_queues unparse_queues(@queues) end - def unpause_queue(queue) + def message_processed(queue) return if @paused_queues.empty? logger.debug "Unpausing #{queue}" diff --git a/spec/shoryuken/manager_spec.rb b/spec/shoryuken/manager_spec.rb index 355dd0c4..dca67e58 100644 --- a/spec/shoryuken/manager_spec.rb +++ b/spec/shoryuken/manager_spec.rb @@ -148,17 +148,17 @@ end context 'when queue.fifo? is true' do - it 'calls unpause_queue on strategy' do + it 'calls message_processed on strategy' do expect(sqs_queue).to receive(:fifo?).and_return(true) - expect(polling_strategy).to receive(:unpause_queue).with(queue) + expect(polling_strategy).to receive(:message_processed).with(queue) subject.send(:processor_done, queue) end end context 'when queue.fifo? is false' do - it 'does not call unpause_queue on strategy' do + it 'does not call message_processed on strategy' do expect(sqs_queue).to receive(:fifo?).and_return(false) - expect(polling_strategy).to_not receive(:unpause_queue) + expect(polling_strategy).to_not receive(:message_processed) subject.send(:processor_done, queue) end end diff --git a/spec/shoryuken/polling/strict_priority_spec.rb b/spec/shoryuken/polling/strict_priority_spec.rb index 64f021dc..868ebc61 100644 --- a/spec/shoryuken/polling/strict_priority_spec.rb +++ b/spec/shoryuken/polling/strict_priority_spec.rb @@ -146,12 +146,12 @@ end end - describe '#unpause_queue' do + describe '#message_processed' do it 'removes paused queue, adds to active queues' do strategy = Shoryuken::Polling::StrictPriority.new([queue1, queue2]) strategy.send(:pause, queue1) expect(strategy.active_queues).to eq([[queue2, 1]]) - strategy.unpause_queue(queue1) + strategy.message_processed(queue1) expect(strategy.active_queues).to eq([[queue1, 2], [queue2, 1]]) end end diff --git a/spec/shoryuken/polling/weighted_round_robin_spec.rb b/spec/shoryuken/polling/weighted_round_robin_spec.rb index 183a8fe0..ab7e146d 100644 --- a/spec/shoryuken/polling/weighted_round_robin_spec.rb +++ b/spec/shoryuken/polling/weighted_round_robin_spec.rb @@ -105,12 +105,12 @@ end end - describe '#unpause_queue' do + describe '#message_processed' do it 'removes paused queue, adds to active queues' do strategy = Shoryuken::Polling::WeightedRoundRobin.new([queue1, queue2]) strategy.send(:pause, queue1) expect(strategy.active_queues).to eq([[queue2, 1]]) - strategy.unpause_queue(queue1) + strategy.message_processed(queue1) expect(strategy.active_queues).to eq([[queue2, 1], [queue1, 1]]) end end