-
-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop queuing up heartbeat threads #345
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ class Manager | |
include Util | ||
|
||
BATCH_LIMIT = 10 | ||
HEARTBEAT_INTERVAL = 0.1 | ||
HEARTBEAT_INTERVAL = 1 | ||
|
||
def initialize(fetcher, polling_strategy) | ||
@count = Shoryuken.options.fetch(:concurrency, 25) | ||
|
@@ -13,16 +13,18 @@ def initialize(fetcher, polling_strategy) | |
@queues = Shoryuken.queues.dup.uniq | ||
|
||
@done = Concurrent::AtomicBoolean.new(false) | ||
@dispatching = Concurrent::AtomicBoolean.new(false) | ||
|
||
@fetcher = fetcher | ||
@polling_strategy = polling_strategy | ||
|
||
@heartbeat = Concurrent::TimerTask.new(run_now: true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @waynerobinson I'm doing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if the But the ensure and re-run should keep the dispatch loop running I think. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @waynerobinson you are right, maybe I was too overcautious. I added it in there just in case. But I'm considering removing it, |
||
execution_interval: HEARTBEAT_INTERVAL, | ||
timeout_interval: 60) { dispatch } | ||
Concurrent::TimerTask.new(execution_interval: 1) do | ||
Shoryuken.logger.info "Threads: #{Thread.list.size}" | ||
end.execute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @waynerobinson With concurrency 10: |
||
|
||
@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count) | ||
@dispatcher_pool = Concurrent::SingleThreadExecutor.new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@waynerobinson now as I'm using this
I will try to reply on that issue as well. I think they should allow to configure a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Except that Don't really need a But the lack of a correct implementation for the timeout causes thread leaks in the timeout monitor if the task takes longer than the execution interval (it should be the timeout interval at the very least, but it never refers to this… hence the bug) to complete. The current design intention of |
||
|
||
@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: HEARTBEAT_INTERVAL) { dispatch } | ||
end | ||
|
||
def start | ||
|
@@ -44,6 +46,7 @@ def stop(options = {}) | |
logger.info { 'Shutting down workers' } | ||
|
||
@heartbeat.kill | ||
@dispatcher_pool.kill | ||
|
||
if options[:shutdown] | ||
hard_shutdown_in(options[:timeout]) | ||
|
@@ -54,22 +57,25 @@ def stop(options = {}) | |
|
||
def processor_done(queue) | ||
logger.debug { "Process done for '#{queue}'" } | ||
|
||
dispatch | ||
end | ||
|
||
private | ||
|
||
def dispatch | ||
return if @done.true? | ||
return unless @dispatching.make_true | ||
|
||
@dispatcher_pool.post(&method(:dispatch_now)) | ||
end | ||
|
||
def dispatch_now | ||
return if ready.zero? | ||
return unless (queue = @polling_strategy.next_queue) | ||
|
||
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" } | ||
|
||
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue) | ||
ensure | ||
@dispatching.make_false | ||
end | ||
|
||
def busy | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson I'm testing
HEARTBEAT_INTERVAL = 1
, as I'm also callingdispatch
in theprocessor_done
callback.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably going to cause issues with empty queues as it will take a least
HEARTBEAT_INTERVAL
before it will request new messages.Instead of this you could just have
dispatch
run in a loop and rely on theTimerTask
to restart it if it ever dies.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although instead of
ready.zero?
, is there just some type of latch on the pool to await a free thread so you don't have to do the sleep?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waynerobinson not I'm aware of 🐼