-
-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make fetcher pause interval configurable #214
Make fetcher pause interval configurable #214
Conversation
expect { Shoryuken::Manager.new(nil) } | ||
.to raise_error(ArgumentError, 'Dispatch interval value -1 is invalid, it needs to be a positive number') | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra empty line detected at block body end.
86ce4b0
to
b1c3d3d
Compare
end | ||
|
||
unless @dispatch_interval > 0 | ||
raise(ArgumentError, "Dispatch interval value #{@dispatch_interval} is invalid, it needs to be a positive number") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long. [122/120]
b1c3d3d
to
a6df078
Compare
end | ||
|
||
unless @fetcher_pause_interval > 0 | ||
raise(ArgumentError, "Fetcher pause interval value #{@fetcher_pause_interval} is invalid, it needs to be a positive number") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long. [132/120]
a6df078
to
5ad43ca
Compare
end | ||
|
||
unless @fetcher_pause_interval > 0 | ||
error_message = "Fetcher pause interval value #{@fetcher_pause_interval} is invalid, it needs to be a positive number" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long. [126/120]
5ad43ca
to
73bfced
Compare
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") | ||
end | ||
|
||
unless @fetcher_pause_interval > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I experimented with this value being set to 0 without any problems.
https://github.com/celluloid/timers/blob/fcb443006ed1770d3ba2ebf3db0e0d2e570c6672/lib/timers/group.rb#L37
So is perhaps instead if @fetcher_pause_interval < 0 is better?
73bfced
to
f66b73d
Compare
@@ -131,7 +140,7 @@ def dispatch | |||
if @ready.empty? | |||
logger.debug { 'Pausing fetcher, because all processors are busy' } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add info in the message for how long is the fetcher being paused?
252b673
to
a837a71
Compare
|
||
if @fetcher_pause_interval < 0 | ||
message = "Fetcher pause interval value #{@fetcher_pause_interval} is invalid, it cannot be negative" | ||
raise(ArgumentError, message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to the PR: I'm wondering if we should move these ArgumentError
errors to cli.rb
, consequently failing in there, instead of later in the startup process. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, moving the relevant tests would also be the first tests against cli.rb
, which seems to be untested at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move that at least to the initialize
method? That's where it's set.
Looks good @darraghotoole 🍻 @mariokostelac do you mind double checking it? |
@@ -12,7 +12,17 @@ class Manager | |||
|
|||
def initialize(condvar) | |||
@count = Shoryuken.options[:concurrency] || 25 | |||
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 | |||
@fetcher_pause_interval = Shoryuken.options[:fetcher_pause_interval] || 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are using two different symbols:
:dispatch_interval
and :fetcher_pause_interval
.
Is that on purpose? I do not see anybody reading dispatch_interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was missed during refactor, good catch.
It seems we can't set that value since we are using two different symbols. |
a837a71
to
f1661e8
Compare
@@ -119,6 +119,10 @@ def parse_cli_args(argv) | |||
opts[:concurrency] = Integer(arg) | |||
end | |||
|
|||
o.on '-i', '--fetcher-pause-interval DECIMAL', 'Interval to pause fetching when all workers are busy' do |arg| | |||
opts[:fetcher_pause_interval] = Float(arg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac Fixed ^^
I've been thinking more about this one. You're solving a problem with optionally decreasing that time, but you're also introducing a problem of manager hitting the Q very often when there are no jobs. Making that value very close to 0 would mean constantly polling the Q. I feel we can do very similar thing like I did in #215.
We already call Using that approach you get total utilisation of your manager. You spend no time waiting for next dispatch cycle. I am fine with adding this extra option on boot, but I'd like to make sure we are improving the code with every change. Could we add those dispatch calls in the same PR @darraghotoole? @phstc are you good with my approach? |
@mariokostelac I'm good with it.
Loved :) |
Fix I've been talking about is merged in f4640d9. |
def dispatch | ||
return if stopped? | ||
|
||
logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" } | ||
|
||
if @ready.empty? | ||
logger.debug { 'Pausing fetcher, because all processors are busy' } | ||
logger.debug { 'Pausing fetcher for #{@fetcher_pause_interval} seconds, because all processors are busy' } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single-quoted strings are not interpolated, so the log will contain the literal text:
Pausing fetcher for #{@fetcher_pause_interval} seconds, because all processors are busy
You probably want to use double quotes here.
This seems conceptually similar to the It's a bit inconsistent that |
@eugeneius Yes, I do not see any useful difference right now. Since we've fixed the original issue on better way, I am inclined to avoid introducing extra complexity and close this PR. |
The fetcher currently pauses for one second when all processes are busy. When running in single-threaded mode this effectively means we can only process one job per second. This PR allows us to configure the pause interval (eg: 0.1 for 100ms) instead of using the hardcoded value.