diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 6add6425..14a8ccb2 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -12,8 +12,6 @@ def initialize(fetcher, polling_strategy) @done = Concurrent::AtomicBoolean.new(false) - @dispatch_later = Concurrent::AtomicBoolean.new(false) - @fetcher = fetcher @polling_strategy = polling_strategy @@ -51,10 +49,10 @@ def processor_done(queue) logger.debug { "Process done for '#{queue}'" } @ready.increment - - dispatch_later unless @done.true? end + private + def dispatch return if @done.true? @@ -62,34 +60,25 @@ def dispatch if @ready.value == 0 logger.debug { 'Pausing fetcher, because all processors are busy' } - dispatch_later - return + return dispatch_later end unless queue = @polling_strategy.next_queue logger.debug { 'Pausing fetcher, because all queues are paused' } - dispatch_later - return + return dispatch_later end batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) - dispatch_later + return dispatch_later end - private - def busy @count - @ready.value end def dispatch_later - return unless @dispatch_later.make_true - - after(1) do - @dispatch_later.make_false - dispatch - end + after(1) { dispatch } end def assign(queue, sqs_msg)