Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fetcher, polling strategy and manager #236

Closed
wants to merge 9 commits into from
4 changes: 3 additions & 1 deletion lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require 'shoryuken/middleware/server/timing'
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'

module Shoryuken
DEFAULTS = {
Expand All @@ -32,7 +33,8 @@ module Shoryuken
startup: [],
quiet: [],
shutdown: [],
}
},
polling_strategy: Polling::WeightedRoundRobin,
}

@@queues = []
Expand Down
61 changes: 16 additions & 45 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,73 +1,44 @@
module Shoryuken
class Fetcher
include Celluloid
include Util

FETCH_LIMIT = 10

def initialize(manager)
@manager = manager
end

def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit

options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

Shoryuken::Client.queues(queue).receive_messages options
end

def fetch(queue, available_processors)
watchdog('Fetcher#fetch died') do
started_at = Time.now

logger.debug { "Looking for new messages in '#{queue}'" }

begin
batch = Shoryuken.worker_registry.batch_receive_messages?(queue)
limit = batch ? FETCH_LIMIT : available_processors

if (sqs_msgs = Array(receive_messages(queue, limit))).any?
logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" }

if batch
@manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs))
else
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
end

@manager.async.rebalance_queue_weight!(queue)
else
logger.debug { "No message found for '#{queue}'" }

@manager.async.pause_queue!(queue)
end
limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors

sqs_msgs = Array(receive_messages(queue, limit))
logger.info { "Found #{sqs_msgs.size} messages for '#{queue}'" }
logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
sqs_msgs
rescue => ex
logger.error { "Error fetching message: #{ex}" }
logger.error { ex.backtrace.first }
[]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac not sure if we need rescue => ex where, as we are using watchdog wrapping the method body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use one or another, not both, that's for sure. I do like having error handling here because SQS does return 500 now and then and it'd be nice to return empty array / retry fetch in that case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac hm nvm just saw that the current implementation does the same.

end

@manager.async.dispatch
end

end

private

def patch_sqs_msgs!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end
def receive_messages(queue, limit)
# AWS limits the batch size by 10
limit = limit > FETCH_LIMIT ? FETCH_LIMIT : limit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac we are doing this same FETCH_LIMIT checking in the fetch method too, I believe we should do it only on the receive_messages method, as it's the one responsible for receiving messages.


options = (Shoryuken.options[:aws][:receive_message] || {}).dup
options[:max_number_of_messages] = limit
options[:message_attribute_names] = %w(All)
options[:attribute_names] = %w(All)

options.merge!(queue.options)

sqs_msgs
Shoryuken::Client.queues(queue.name).receive_messages(options)
end
end
end
5 changes: 2 additions & 3 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ class Launcher
def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
@fetcher = Shoryuken::Fetcher.new_link(manager)

@done = false

manager.fetcher = @fetcher
manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true
@fetcher.terminate if @fetcher.alive?

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
Expand Down
134 changes: 49 additions & 85 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ class Manager
include Util

attr_accessor :fetcher
attr_accessor :polling_strategy

exclusive :dispatch

trap_exit :processor_died

BATCH_LIMIT = 10

def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
Expand Down Expand Up @@ -40,8 +45,6 @@ def stop(options = {})

fire_event(:shutdown, true)

@fetcher.terminate if @fetcher.alive?

logger.info { "Shutting down #{@ready.size} quiet workers" }

@ready.each do |processor|
Expand Down Expand Up @@ -71,6 +74,7 @@ def processor_done(queue, processor)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << processor
async.dispatch
end
end
end
Expand All @@ -86,6 +90,7 @@ def processor_died(processor, reason)
return after(0) { @finished.signal } if @busy.empty?
else
@ready << build_processor
async.dispatch
end
end
end
Expand All @@ -94,58 +99,27 @@ def stopped?
@done
end

def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

processor = @ready.pop
@busy << processor

processor.async.process(queue, sqs_msg)
end
end

def rebalance_queue_weight!(queue)
watchdog('Manager#rebalance_queue_weight! died') do
if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))
logger.info { "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" }

@queues << queue
end
end
end

def pause_queue!(queue)
return if [email protected]?(queue) || Shoryuken.options[:delay].to_f <= 0

logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" }

@queues.delete(queue)

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end


def dispatch
return if stopped?

logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" }
logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{polling_strategy.active_queues}" }

if @ready.empty?
logger.debug { 'Pausing fetcher, because all processors are busy' }

after(1) { dispatch }

return
end

if (queue = next_queue)
@fetcher.async.fetch(queue, @ready.size)
else
queue = polling_strategy.next_queue
if queue.nil?
logger.debug { 'Pausing fetcher, because all queues are paused' }

@fetcher_paused = true
after(1) { dispatch }
return
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)

async.dispatch
end

def real_thread(proxy_id, thr)
Expand All @@ -154,61 +128,41 @@ def real_thread(proxy_id, thr)

private

def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def restart_queue!(queue)
return if stopped?

unless @queues.include? queue
logger.debug { "Restarting '#{queue}'" }

@queues << queue

if @fetcher_paused
logger.debug { 'Restarting fetcher' }
def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

@fetcher_paused = false
processor = @ready.pop
@busy << processor

dispatch
end
processor.async.process(queue, sqs_msg)
end
end

def current_queue_weight(queue)
queue_weight(@queues, queue)
def dispatch_batch(queue)
batch = fetcher.fetch(queue, BATCH_LIMIT)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I believe we could use something like Fetcher::FETCH_LIMIT instead of BATCH_LIMIT as it should be the same, in case of batch we want the maximum possible to fetch anyway == fetch limit, and the fetch is the one that knows the what's the maximum.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the dependency with constants, we could also have specific methods fetch_max and fetch_limited or something like that, but I don't see problems with Fetcher::FETCH_LIMIT as Manager already knows Fetcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was logical separation, but I am good with referencing fetch limit. Other approach would be having BATCH_LIMIT = Fetcher::FETCH_LIMIT

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I wouldn't introduce BATCH_LIMIT, because we don't have a such thing. In this case we want: give the maximum you can. Just to make it simpler for future maintenance.

polling_strategy.messages_found(queue.name, batch.size)
assign(queue.name, patch_batch!(batch))
end

def original_queue_weight(queue)
queue_weight(Shoryuken.queues, queue)
def dispatch_single_messages(queue)
messages = fetcher.fetch(queue, @ready.size)
polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end

def queue_weight(queues, queue)
queues.count { |q| q == queue }
def batched_queue?(queue)
Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end

def next_queue
return nil if @queues.empty?

# get/remove the first queue in the list
queue = @queues.shift

unless defined?(::ActiveJob) || !Shoryuken.worker_registry.workers(queue).empty?
# when no worker registered pause the queue to avoid endless recursion
logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" }

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }

return next_queue
end

# add queue back to the end of the list
@queues << queue
def delay
Shoryuken.options[:delay].to_f
end

queue
def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
end

def soft_shutdown(delay)
Expand Down Expand Up @@ -241,5 +195,15 @@ def hard_shutdown_in(delay)
end
end
end

def patch_batch!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have to check it out, it was part of fetcher before and i just moved it here. I'd like to get rid of that if possible. Will check if we need this (it think we did need it for logging).

end

sqs_msgs
end
end
end
Loading