Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: native threads #299

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 17 additions & 83 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,22 @@ module Shoryuken
class Manager
include Util

BATCH_LIMIT = 10

def initialize(fetcher, polling_strategy)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0

@queues = Shoryuken.queues.dup.uniq

@done = Concurrent::AtomicBoolean.new(false)

@fetcher = fetcher
@polling_strategy = polling_strategy

@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 1, timeout_interval: 60) { dispatch }

@ready = Concurrent::AtomicFixnum.new(@count)

@pool = Concurrent::FixedThreadPool.new(@count)
create_processors
end

def start
logger.info { 'Starting' }

@heartbeat.execute
@processors.each(&:start)
end

def stop(options = {})
@done.make_true

if (callback = Shoryuken.stop_callback)
logger.info { 'Calling Shoryuken.on_stop block' }
callback.call
Expand All @@ -40,94 +27,41 @@ def stop(options = {})

logger.info { "Shutting down workers" }

@heartbeat.kill

if options[:shutdown]
hard_shutdown_in(options[:timeout])
else
soft_shutdown
end
end

def processor_done(queue)
logger.debug { "Process done for '#{queue}'" }

@ready.increment
end

private

def dispatch
return if @done.true?

logger.debug { "Ready: #{@ready.value}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

if @ready.value == 0
return logger.debug { 'Pausing fetcher, because all processors are busy' }
end

unless queue = @polling_strategy.next_queue
return logger.debug { 'Pausing fetcher, because all queues are paused' }
def create_processors
@processors = (1..@count).map do
# fix this initialisation. we should creating new copies instead of calling .dup
Processor.new(@fetcher.dup, @polling_strategy.dup)
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
end

def busy
@count - @ready.value
end

def assign(queue, sqs_msg)
logger.debug { "Assigning #{sqs_msg.message_id}" }

@ready.decrement

@pool.post { Processor.new(self).process(queue, sqs_msg) }
end

def dispatch_batch(queue)
batch = @fetcher.fetch(queue, BATCH_LIMIT)
@polling_strategy.messages_found(queue.name, batch.size)
assign(queue.name, patch_batch!(batch))
end

def dispatch_single_messages(queue)
messages = @fetcher.fetch(queue, @ready.value)
@polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end

def batched_queue?(queue)
Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end
private

def soft_shutdown
@pool.shutdown
@pool.wait_for_termination
@processors.each(&:terminate)
end

def hard_shutdown_in(delay)
if busy > 0
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }
end
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

@pool.shutdown
soft_shutdown

unless @pool.wait_for_termination(delay)
logger.info { "Hard shutting down #{busy} busy workers" }

@pool.kill
wait_until = Time.now + delay
while Time.now < wait_until && has_alive_processors?
sleep(0.1)
end
end

def patch_batch!(sqs_msgs)
sqs_msgs.instance_eval do
def message_id
"batch-with-#{size}-messages"
end
end
@processors.each(&:kill)
end

sqs_msgs
def has_alive_processors?
@processors.any?(&:alive?)
end
end
end
83 changes: 77 additions & 6 deletions lib/shoryuken/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,77 @@ module Shoryuken
class Processor
include Util

def initialize(manager)
@manager = manager
BATCH_LIMIT = 10

def initialize(fetcher, polling_strategy)
@fetcher = fetcher
@polling_strategy = polling_strategy
# what is the worst thing that could happen because it's not atomic?
# is it possible that it evaluates to true even if we never set it to true?
@done = false
end

def start
# does updating and evaluating @thread has to be wrapped in mutex?
# can it throw exception?
# what should we do if it dies?
@thread ||= spawn_thread(&method(:loop))
end

def terminate(wait=false)
@done = true
# does updating and evaluating @thread has to be wrapped in mutex?
return if !@thread
@thread.value if wait
end

def kill
@done = true
# does updating and evaluating @thread has to be wrapped in mutex?
return if !@thread
@thread.raise ::Shoryuken::Shutdown
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac that's where ^ I see some advantage of using concurrent-ruby/thread-pool, it abstracts some of this logic shutdown, kill, wait_for_termination etc.

Also, replacing dead processors automatically. Here, we would need to re-spawn in case the thread dies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. It's not the easiest thing to get right. I have to give it another thought, but ThreadPool alone does not seem as the best solution. Futures on ThreadPool could be better, though, have to investigate that a little bit more.
Still, I think it's difficult to get the same throughput if we do not use Actor model or if we do not make Processor self-sufficient.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I think it's difficult to get the same throughput if we do not use Actor model or if we do not make Processor self-sufficient.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too opinionated whether we should use some library or not, it was just a try to see how it looks like. I like it, but it introduces handling low-level stuff.
I think meeting half-way would look like:

  • processor look the same as here, self-sufficient
  • processors handled by thread-pool, pool.post { processor.loop }
  • having some watchdog that periodically checks that all processors are alive.

That way manager becomes simple, we do not share state - no concurrency problems, and we do not have to handle threads ourselves.

Copy link
Contributor Author

@mariokostelac mariokostelac Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

Actually, we have a problem where processor fails and we do not know that (@ready variable does not increment). It could be the case that we can see how many "available" processors we have by asking "how many threads available do you have?".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

As far as I understand the code, we poll once per second. Regardless the concurrency level, we can have a lot of time being spent waiting for next dispatch.
Let's say every job takes 20ms, concurrency set to 8.
t=0, we fetch 8 jobs, we dispatch them
t=20ms, all processors finished
t=1000ms, we fetch 8 jobs, we dispatch them
t=1020ms, all processors finished.

We actually spend 980 ms waiting for next dispatch.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have a problem where processor fails and we do not know that (@ready variable does not increment). It could be the case that we can see how many "available" processors we have by asking "how many threads available do you have?".

@mariokostelac great call! I totally missed that. Maybe we could use remaining_capacity in place of @ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That implies that you ARE improving the system throughput with increasing concurrency settings, but never to the level where it should be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac

We actually spend 980 ms waiting for next dispatch.

How long it would it take to perform 8 requests to SQS to fetch a message per processor?

But anyway, I mentioned in another comment, we can remove the heartbeat and call dispatch once a processor is done. So we processor whenever a processor is ready.

TimeTask says that the delay is in seconds, but maybe it supports something like this 0.5.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I think for most people, 1 second does not matter much, also it isn't exact a second, a second starts counting before the fetch. It's in background anyways. But I see your point, just saying that it's something we need to trade ~ 1 second delay or increase the number of requests.


def alive?
# is this ok?
[email protected]
end

private

def loop
while !@done
do_work
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac wouldn't this cause non-stop fetching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the point where polling strategy kicks in. But you are right, it will melt :o

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not difficult to fix. We can always tell it to sleep if there are no active queues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac actually it isn't just about active queues, it's about available thread to process, we can't fetch and wait.

rescue ::Shoryuken::Shutdown
# killed, bail!
rescue => e
# are we safe to retry here? What are kind of errors we do not want to recover from?
# OOM?
logger.debug { "Processor caught exception: #{e}" }
retry
end

def do_work
queue = @polling_strategy.next_queue
return logger.info { 'Doing nothing, all queues are paused' } if queue.nil?
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_message(queue)
end

def dispatch_batch(queue)
batch = @fetcher.fetch(queue, BATCH_LIMIT)
@polling_strategy.messages_found(queue.name, batch.size)
process(queue.name, patch_batch!(batch))
end

def dispatch_single_message(queue)
messages = @fetcher.fetch(queue, 1)
@polling_strategy.messages_found(queue.name, messages.size)
process(queue.name, messages.first) if messages.length > 0
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac thinking more on processors fetching messages... Maybe we can start using long-pool wait_time_seconds: ..., so a processor does not need to go non-stop to SQS when the queue is empty, it can wait for a few seconds if the queue is empty. Better to wait waiting for messages, than waiting for nothing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac ^ but if we go with that approach, we will need to make sure that the polling_strategy is thread safe, it isn't currently, otherwise we won't be able to guarantee the order/load balancing, as each processor will work individually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not possible with the current set of interfaces (all queues are paused so we get nil), but our interfaces are not written in stone.

It's good that you mentioned long polling - since processors are independent, it should be totally ok to use long-polling all the time. Ish. If we have just one queue, we should use long polling for sure.
If we have multiple queues, we could not want to block for too long on q1 if there is q2 being full. I am wondering if long polling wait time is the actual parameter we want to tweak, instead of the heartbeat interval we've discussed in #291. I guess being able to configure queue_empty_pause for an empty queue and long_polling_wait_time will be sufficient for all kinds of workloads.

Just to recap what would that approach bring us:

  • optimal performance for "one queue" configurations, regardless concurrency level
  • configurable tradeoff for cost vs throughput via long_polling_wait_time and queue_empty_pause parameters. People could optimise the number of calls to SQS or throughput of the system, depending on their needs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I think we can deprecated delay in favor of wait_time_seconds, if you don't want hold checking all queues, you can set it to zero, if you want to wait and save some $ (too little as you mentioned), you can bump it to whatever you would like to.

instead of the heartbeat interval we've discussed in #291

That implementation is straightforward and working. But as you raised, it isn't as optimal as it could be, there will be always a possible 25ms delay, which isn't a huge thing for some cases, but I bet it's for others. If we need to wait, it feels like it's better to wait waiting for messages, not just for the heartbeat. But if we go with processors fetching messages, and long-pool (which feels cool indeed), I think we we will need the polling strategies to make it thread-safe, otherwise we won't be able to load balance the queues consumption, so we will move the complexity around, but we may ended with a very optimal setup.

I think Shoryuken doesn't need to address all needs. We can go with a solution that's good and straightforward as possible for most users and for us to maintain. Apps that can't wait for something less than second to process are very edge case.


def batched_queue?(queue)
Shoryuken.worker_registry.batch_receive_messages?(queue.name)
end

def process(queue, sqs_msg)
Expand All @@ -13,11 +82,13 @@ def process(queue, sqs_msg)
worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
worker.perform(sqs_msg, body)
end

@manager.processor_done(queue)
end

private

def spawn_thread(&block)
Thread.new do
block.call
end
end

def get_body(worker_class, sqs_msg)
if sqs_msg.is_a? Array
Expand Down