Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent ruby #280

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ end
desc 'Open Shoryuken pry console'
task :console do
require 'pry'
require 'celluloid/current'
require 'shoryuken'

config_file = File.join File.expand_path('..', __FILE__), 'shoryuken.yml'
Expand Down
21 changes: 6 additions & 15 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'erb'

require 'shoryuken'
require 'concurrent-edge'

module Shoryuken
# See: https://github.com/mperham/sidekiq/blob/33f5d6b2b6c0dfaab11e5d39688cab7ebadc83ae/lib/sidekiq/cli.rb#L20
Expand Down Expand Up @@ -41,7 +42,7 @@ def run(args)

loader.load

load_celluloid
load_concurrent_ruby

require 'shoryuken/launcher'
@launcher = Shoryuken::Launcher.new
Expand All @@ -68,29 +69,19 @@ def run(args)

private

def load_celluloid
require 'celluloid/current'
Celluloid.logger = (Shoryuken.options[:verbose] ? Shoryuken.logger : nil)
def load_concurrent_ruby
Concurrent.global_logger = lambda do |level, progname, msg = nil, &block|
Shoryuken.logger.log(level, msg, progname, &block)
end if Shoryuken.logger

require 'shoryuken/manager'
end

def celluloid_loaded?
defined?(::Celluloid)
end

def daemonize(options)
return unless options[:daemon]

fail ArgumentError, "You really should set a logfile if you're going to daemonize" unless options[:logfile]

if celluloid_loaded?
# Celluloid can't be loaded until after we've daemonized
# because it spins up threads and creates locks which get
# into a very bad state if forked.
raise "Celluloid cannot be required until here, or it will break Shoryuken's daemonization"
end

files_to_reopen = []
ObjectSpace.each_object(File) do |file|
files_to_reopen << file unless file.closed?
Expand Down
14 changes: 6 additions & 8 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module Shoryuken
class Fetcher
include Celluloid
class Fetcher < Concurrent::Actor::RestartingContext
include Util

FETCH_LIMIT = 10
Expand Down Expand Up @@ -35,16 +34,16 @@ def fetch(queue, available_processors)
logger.debug { "Found #{sqs_msgs.size} messages for '#{queue}'" }

if batch
@manager.async.assign(queue, patch_sqs_msgs!(sqs_msgs))
@manager.tell([:assign, queue, patch_sqs_msgs!(sqs_msgs)])
else
sqs_msgs.each { |sqs_msg| @manager.async.assign(queue, sqs_msg) }
sqs_msgs.each { |sqs_msg| @manager.tell([:assign, queue, sqs_msg]) }
end

@manager.async.rebalance_queue_weight!(queue)
@manager.tell([:rebalance_queue_weight!, queue])
else
logger.debug { "No message found for '#{queue}'" }

@manager.async.pause_queue!(queue)
@manager.tell([:pause_queue!, queue])
end

logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" }
Expand All @@ -53,9 +52,8 @@ def fetch(queue, available_processors)
logger.error { ex.backtrace.first }
end

@manager.async.dispatch
@manager.tell([:dispatch])
end

end

private
Expand Down
32 changes: 8 additions & 24 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,27 @@
module Shoryuken
class Launcher
include Celluloid
class Launcher < Concurrent::Actor::RestartingContext
include Util

trap_exit :actor_died

attr_accessor :manager

def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)
@fetcher = Shoryuken::Fetcher.new_link(manager)

@done = false
@manager = Shoryuken::Manager.spawn! name: :manager, link: true
@fetcher = Shoryuken::Fetcher.spawn! name: :fetcher, link: true, args: [@manager]

manager.fetcher = @fetcher
@manager.ask!([:fetcher, @fetcher])
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true
@fetcher.terminate if @fetcher.alive?
@fetcher.ask!(:terminate!)

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
manager.terminate
@manager.ask!([:stop, shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]])
@manager.ask!(:terminate!)
end
end

def run
watchdog('Launcher#run') do
manager.async.start
@manager.tell(:start)
end
end

def actor_died(actor, reason)
return if @done
logger.warn { "Shoryuken died due to the following error, cannot recover, process exiting: #{reason}" }
exit 1
end
end
end
90 changes: 34 additions & 56 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@
require 'shoryuken/fetcher'

module Shoryuken
class Manager
include Celluloid
class Manager < Concurrent::Actor::RestartingContext
include Util

attr_accessor :fetcher
def initialize
@count = Shoryuken.options[:concurrency] || 25

trap_exit :processor_died
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") if @count < 1

def initialize(condvar)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
@queues = Shoryuken.queues.dup.uniq
@finished = condvar

@done = false

Expand All @@ -23,6 +19,10 @@ def initialize(condvar)
@threads = {}
end

def fetcher(fetcher)
@fetcher = fetcher
end

def start
logger.info { 'Starting' }

Expand All @@ -40,17 +40,15 @@ def stop(options = {})

fire_event(:shutdown, true)

@fetcher.terminate if @fetcher.alive?
@fetcher.ask!(:terminate!)

logger.info { "Shutting down #{@ready.size} quiet workers" }

@ready.each do |processor|
processor.terminate if processor.alive?
processor.ask!(:terminate!)
end
@ready.clear

return after(0) { @finished.signal } if @busy.empty?

if options[:shutdown]
hard_shutdown_in(options[:timeout])
else
Expand All @@ -67,31 +65,21 @@ def processor_done(queue, processor)
@busy.delete processor

if stopped?
processor.terminate if processor.alive?
return after(0) { @finished.signal } if @busy.empty?
processor.ask!(:terminate!)
else
@ready << processor
end
end
end

def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
logger.error { "Process died, reason: #{reason}" }

@threads.delete(processor.object_id)
@busy.delete processor
def stopped?
@done
end

if stopped?
return after(0) { @finished.signal } if @busy.empty?
else
@ready << build_processor
end
end
def reset
end

def stopped?
@done
def terminated(_)
end

def assign(queue, sqs_msg)
Expand All @@ -101,7 +89,7 @@ def assign(queue, sqs_msg)
processor = @ready.pop
@busy << processor

processor.async.process(queue, sqs_msg)
processor.tell([:process, queue, sqs_msg])
end
end

Expand All @@ -122,7 +110,7 @@ def pause_queue!(queue)

@queues.delete(queue)

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
after(Shoryuken.options[:delay].to_f) { tell([:restart_queue!, queue]) }
end


Expand All @@ -139,18 +127,14 @@ def dispatch
end

if (queue = next_queue)
@fetcher.async.fetch(queue, @ready.size)
@fetcher.tell [:fetch, queue, @ready.size]
else
logger.debug { 'Pausing fetcher, because all queues are paused' }

@fetcher_paused = true
end
end

def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end

private

def dispatch_later
Expand All @@ -161,9 +145,7 @@ def dispatch_later
end

def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
Processor.spawn! name: :processor, link: true, args: [self]
end

def restart_queue!(queue)
Expand Down Expand Up @@ -206,7 +188,7 @@ def next_queue
# when no worker registered pause the queue to avoid endless recursion
logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because no workers registered" }

after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
after(Shoryuken.options[:delay].to_f) { tell([:restart_queue!, queue]) }

return next_queue
end
Expand All @@ -222,30 +204,26 @@ def soft_shutdown(delay)

if @busy.size > 0
after(delay) { soft_shutdown(delay) }
else
@finished.signal
end
end

def hard_shutdown_in(delay)
logger.info { "Waiting for #{@busy.size} busy workers" }
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

after(delay) do
watchdog('Manager#hard_shutdown_in died') do
if @busy.size > 0
logger.info { "Hard shutting down #{@busy.size} busy workers" }

@busy.each do |processor|
if processor.alive? && t = @threads.delete(processor.object_id)
t.raise Shutdown
end
end
end

@finished.signal
end
end
# after(delay) do
# watchdog('Manager#hard_shutdown_in died') do
# if @busy.size > 0
# logger.info { "Hard shutting down #{@busy.size} busy workers" }

# # @busy.each do |processor|
# # if processor.alive? && t = @threads.delete(processor.object_id)
# # t.raise Shutdown
# # end
# # end
# end
# end
# end
end
end
end
15 changes: 7 additions & 8 deletions lib/shoryuken/middleware/server/auto_extend_visibility.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
require 'celluloid/current' unless defined?(Celluloid)

module Shoryuken
module Middleware
module Server
Expand All @@ -12,22 +10,23 @@ def call(worker, queue, sqs_msg, body)
yield
ensure
if timer
timer.cancel
@visibility_extender.terminate
timer.shutdown
@visibility_extender.ask!(:terminate)
end
end
end

private

class MessageVisibilityExtender
include Celluloid
class MessageVisibilityExtender < Concurrent::Actor::RestartingContext
include Util

def auto_extend(worker, queue, sqs_msg, body)
queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout

every(queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do
interval = queue_visibility_timeout - EXTEND_UPFRONT_SECONDS

Concurrent::TimerTask.execute(execution_interval: interval, timeout_interval: interval) do
begin
logger.debug do
"Extending message #{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \
Expand All @@ -48,7 +47,7 @@ def auto_extend(worker, queue, sqs_msg, body)

def auto_visibility_timer(worker, queue, sqs_msg, body)
return unless worker.class.auto_visibility_timeout?
@visibility_extender = MessageVisibilityExtender.new_link
@visibility_extender = MessageVisibilityExtender.spawn! name: :message_visibility_extender, link: true
@visibility_extender.auto_extend(worker, queue, sqs_msg, body)
end
end
Expand Down
Loading