diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index 7da3c9e5..d8c2ac06 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -13,19 +13,18 @@ def initialize(fetcher, polling_strategy) @queues = Shoryuken.queues.dup.uniq @done = Concurrent::AtomicBoolean.new(false) - @dispatching = Concurrent::AtomicBoolean.new(false) @fetcher = fetcher @polling_strategy = polling_strategy - @heartbeat = Concurrent::TimerTask.new(run_now: true, - execution_interval: HEARTBEAT_INTERVAL) { @pool.post { dispatch } if @dispatching.false? } - Concurrent::TimerTask.new(execution_interval: 1) do Shoryuken.logger.info "Threads: #{Thread.list.size}" end.execute @pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count) + @dispatcher_pool = Concurrent::SingleThreadExecutor.new + + @heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: HEARTBEAT_INTERVAL) { @dispatcher_pool.post { dispatch } } end def start @@ -47,6 +46,7 @@ def stop(options = {}) logger.info { 'Shutting down workers' } @heartbeat.kill + @dispatcher_pool.kill if options[:shutdown] hard_shutdown_in(options[:timeout]) @@ -63,18 +63,13 @@ def processor_done(queue) def dispatch return if @done.true? - return unless @dispatching.make_true - begin - return if ready.zero? - return unless (queue = @polling_strategy.next_queue) + return if ready.zero? + return unless (queue = @polling_strategy.next_queue) - logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } + logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } - batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) - ensure - @dispatching.make_false - end + batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) end def busy