Skip to content

Commit

Permalink
Make dispatch interval configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
darraghotoole committed May 26, 2016
1 parent 9f38980 commit 5ad43ca
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
4 changes: 4 additions & 0 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def parse_cli_args(argv)
opts[:concurrency] = Integer(arg)
end

o.on '-i', '--fetcher-pause-interval DECIMAL', 'Interval to pause fetching when all workers are busy' do |arg|
opts[:dispatch_interval] = Float(arg)
end

o.on '-d', '--daemon', 'Daemonize process' do |arg|
opts[:daemon] = arg
end
Expand Down
15 changes: 12 additions & 3 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@ class Manager

def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
@fetcher_pause_interval = Shoryuken.options[:fetcher_pause_interval] || 1

unless @count > 0
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number")
end

unless @fetcher_pause_interval > 0
error_message = "Fetcher pause interval value #{@fetcher_pause_interval} is invalid, it needs to be a positive number"
raise(ArgumentError, error_message)
end

@queues = Shoryuken.queues.dup.uniq
@finished = condvar

Expand Down Expand Up @@ -122,7 +132,6 @@ def pause_queue!(queue)
after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end


def dispatch
return if stopped?

Expand All @@ -131,7 +140,7 @@ def dispatch
if @ready.empty?
logger.debug { 'Pausing fetcher, because all processors are busy' }

after(1) { dispatch }
after(@fetcher_pause_interval) { dispatch }

return
end
Expand Down
12 changes: 10 additions & 2 deletions spec/shoryuken/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
describe 'Invalid concurrency setting' do
it 'raises ArgumentError if concurrency is not positive number' do
Shoryuken.options[:concurrency] = -1
expect { Shoryuken::Manager.new(nil) }
.to raise_error(ArgumentError, 'Concurrency value -1 is invalid, it needs to be a positive number')
expect { Shoryuken::Manager.new(nil) }.
to raise_error(ArgumentError, 'Concurrency value -1 is invalid, it needs to be a positive number')
end

end

describe 'Invalid fetch pause interval setting' do
it 'raises ArgumentError if fetcher pause interval is not positive number' do
Shoryuken.options[:fetcher_pause_interval] = -1
expect { Shoryuken::Manager.new(nil) }.
to raise_error(ArgumentError, 'Fetcher pause interval value -1 is invalid, it needs to be a positive number')
end
end

describe 'Auto Scaling' do
it 'decreases weight' do
queue1 = 'shoryuken'
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def perform(sqs_msg, body); end

Shoryuken.queues.clear

Shoryuken.options[:fetcher_pause_interval] = 1
Shoryuken.options[:concurrency] = 1
Shoryuken.options[:delay] = 1
Shoryuken.options[:timeout] = 1
Expand Down

0 comments on commit 5ad43ca

Please sign in to comment.