Skip to content

Commit

Permalink
Use SingleThreadExecutor for the heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Mar 24, 2017
1 parent 9395fe6 commit 1cd46bf
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class Manager
include Util

BATCH_LIMIT = 10
HEARTBEAT_INTERVAL = 0.1
HEARTBEAT_INTERVAL = 1

def initialize(fetcher, polling_strategy)
@count = Shoryuken.options.fetch(:concurrency, 25)
Expand All @@ -13,19 +13,18 @@ def initialize(fetcher, polling_strategy)
@queues = Shoryuken.queues.dup.uniq

@done = Concurrent::AtomicBoolean.new(false)
@dispatching = Concurrent::AtomicBoolean.new(false)

@fetcher = fetcher
@polling_strategy = polling_strategy

@heartbeat = Concurrent::TimerTask.new(run_now: true,
execution_interval: HEARTBEAT_INTERVAL) { @pool.post { dispatch } if @dispatching.false? }

Concurrent::TimerTask.new(execution_interval: 1) do
Shoryuken.logger.info "Threads: #{Thread.list.size}"
end.execute

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
@dispatcher_pool = Concurrent::SingleThreadExecutor.new

@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: HEARTBEAT_INTERVAL) { dispatch }
end

def start
Expand All @@ -47,6 +46,7 @@ def stop(options = {})
logger.info { 'Shutting down workers' }

@heartbeat.kill
@dispatcher_pool.kill

if options[:shutdown]
hard_shutdown_in(options[:timeout])
Expand All @@ -57,24 +57,24 @@ def stop(options = {})

def processor_done(queue)
logger.debug { "Process done for '#{queue}'" }

dispatch
end

private

def dispatch
return if @done.true?
return unless @dispatching.make_true
@dispatcher_pool.post(&method(:dispatch_now))
end

begin
return if ready.zero?
return unless (queue = @polling_strategy.next_queue)
def dispatch_now
return if ready.zero?
return unless (queue = @polling_strategy.next_queue)

logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
@dispatching.make_false
end
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
end

def busy
Expand Down

0 comments on commit 1cd46bf

Please sign in to comment.