Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fetcher, polling strategy and manager #284

Merged
merged 11 commits into from
Dec 13, 2016
4 changes: 3 additions & 1 deletion lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require 'shoryuken/middleware/server/timing'
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'

module Shoryuken
DEFAULTS = {
Expand All @@ -33,7 +34,8 @@ module Shoryuken
startup: [],
quiet: [],
shutdown: [],
}
},
polling_strategy: Polling::WeightedRoundRobin,
}

@@queues = []
Expand Down
61 changes: 16 additions & 45 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,73 +1,44 @@
module Shoryuken
class Fetcher
include Celluloid
include Util

FETCH_LIMIT = 10

def initialize(manager)
@manager = manager
end

def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = (Shoryuken::AwsConfig.options[:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

Shoryuken::Client.queues(queue).receive_messages options
end

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
batch = Shoryuken.worker_registry.batch_receive_messages?(queue)
limit = batch ? FETCH_LIMIT : available_processors

if (sqs_msgs = Array(receive_messages(queue, limit))).any?
logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" }

if batch
@manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs))
else
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
end

@manager.async.rebalance_queue_weight!(queue)
else
logger.debug { "No message found for '#{queue}'" }

@manager.async.pause_queue!(queue)
end
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" }
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
end

@manager.async.dispatch
end

end

private

def patch_sqs_msgs!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end
def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

options.merge!(queue.options)

sqs_msgs
Shoryuken::Client.queues(queue.name).receive_messages(options)
end
end
end
5 changes: 2 additions & 3 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ class Launcher
def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
@fetcher = Shoryuken::Fetcher.new_link(manager)

@done = false

manager.fetcher = @fetcher
manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true
@fetcher.terminate if @fetcher.alive?

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
Expand Down
133 changes: 49 additions & 84 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ class Manager
include Util

attr_accessor :fetcher
attr_accessor :polling_strategy

exclusive :dispatch

trap_exit :processor_died

BATCH_LIMIT = 10

def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
Expand Down Expand Up @@ -40,8 +45,6 @@ def stop(options = {})

fire_event(:shutdown, true)

@fetcher.terminate if @fetcher.alive?

logger.info { "Shutting down #{@ready.size} quiet workers" }

@ready.each do |processor|
Expand Down Expand Up @@ -71,6 +74,7 @@ def processor_done(queue, processor)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << processor
async.dispatch
end
end
end
Expand All @@ -86,6 +90,7 @@ def processor_died(processor, reason)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << build_processor
async.dispatch
end
end
end
Expand All @@ -94,57 +99,27 @@ def stopped?
@done
end

def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

processor = @ready.pop
@busy << processor

processor.async.process(queue, sqs_msg)
end
end

def rebalance_queue_weight!(queue)
watchdog('Manager#rebalance_queue_weight! died') do
if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))
logger.info { "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" }

@queues << queue
end
end
end

def pause_queue!(queue)
return if [email protected]?(queue) || Shoryuken.options[:delay].to_f <= 0

logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" }

@queues.delete(queue)

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end


def dispatch
return if stopped?

logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" }
logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{polling_strategy.active_queues}" }

if @ready.empty?
logger.debug { 'Pausing fetcher, because all processors are busy' }

dispatch_later
return
end

if (queue = next_queue)
@fetcher.async.fetch(queue, @ready.size)
else
queue = polling_strategy.next_queue
if queue.nil?
logger.debug { 'Pausing fetcher, because all queues are paused' }

@fetcher_paused = true
dispatch_later
return
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)

async.dispatch
end

def real_thread(proxy_id, thr)
Expand All @@ -160,61 +135,41 @@ def dispatch_later
end
end

def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def restart_queue!(queue)
return if stopped?

unless @queues.include? queue
logger.debug { "Restarting '#{queue}'" }

@queues << queue

if @fetcher_paused
logger.debug { 'Restarting fetcher' }
def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

@fetcher_paused = false
processor = @ready.pop
@busy << processor

dispatch
end
processor.async.process(queue, sqs_msg)
end
end

def current_queue_weight(queue)
queue_weight(@queues, queue)
def dispatch_batch(queue)
batch = fetcher.fetch(queue, BATCH_LIMIT)
polling_strategy.messages_found(queue.name, batch.size)
assign(queue.name, patch_batch!(batch))
end

def original_queue_weight(queue)
queue_weight(Shoryuken.queues, queue)
def dispatch_single_messages(queue)
messages = fetcher.fetch(queue, @ready.size)
polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end

def queue_weight(queues, queue)
queues.count { |q| q == queue }
def batched_queue?(queue)
Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end

def next_queue
return nil if @queues.empty?

# get/remove the first queue in the list
queue = @queues.shift

unless defined?(::ActiveJob) || !Shoryuken.worker_registry.workers(queue).empty?
# when no worker registered pause the queue to avoid endless recursion
logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" }

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }

return next_queue
end

# add queue back to the end of the list
@queues << queue
def delay
Shoryuken.options[:delay].to_f
end

queue
def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def soft_shutdown(delay)
Expand Down Expand Up @@ -247,5 +202,15 @@ def hard_shutdown_in(delay)
end
end
end

def patch_batch!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end

sqs_msgs
end
end
end
Loading