Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop queuing up heartbeat threads #345

Merged
merged 3 commits into from
Mar 24, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ module Shoryuken
class Manager
include Util

BATCH_LIMIT = 10
HEARTBEAT_INTERVAL = 0.1
BATCH_LIMIT = 10

def initialize(fetcher, polling_strategy)
@count = Shoryuken.options.fetch(:concurrency, 25)
Expand All @@ -13,22 +12,18 @@ def initialize(fetcher, polling_strategy)
@queues = Shoryuken.queues.dup.uniq

@done = Concurrent::AtomicBoolean.new(false)
@dispatching = Concurrent::AtomicBoolean.new(false)

@fetcher = fetcher
@polling_strategy = polling_strategy

@heartbeat = Concurrent::TimerTask.new(run_now: true,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waynerobinson I'm doing the loop in a ensure block, and also calling dispatch_async when a processor done, just in case. So I don't think we need to the heartbeat anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the dispatch_async in processor_done does anything given the normal dispatcher re-runs at the end anyway.

But the ensure and re-run should keep the dispatch loop running I think. 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waynerobinson you are right, maybe I was too overcautious. I added it in there just in case. But I'm considering removing it, ensure should work.

execution_interval: HEARTBEAT_INTERVAL,
timeout_interval: 60) { dispatch }

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
@dispatcher_executor = Concurrent::SingleThreadExecutor.new
end

def start
logger.info { 'Starting' }

@heartbeat.execute
dispatch_async
end

def stop(options = {})
Expand All @@ -43,7 +38,7 @@ def stop(options = {})

logger.info { 'Shutting down workers' }

@heartbeat.kill
@dispatcher_executor.kill

if options[:shutdown]
hard_shutdown_in(options[:timeout])
Expand All @@ -58,18 +53,23 @@ def processor_done(queue)

private

def dispatch
def dispatch_async
@dispatcher_executor.post(&method(:dispatch_now))
end

def dispatch_now
return if @done.true?
return unless @dispatching.make_true

return if ready.zero?
return unless (queue = @polling_strategy.next_queue)
begin
return if ready.zero?
return unless (queue = @polling_strategy.next_queue)

logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
@dispatching.make_false
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
dispatch_async
end
end

def busy
Expand Down