-
-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: native threads #299
WIP: native threads #299
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,77 @@ module Shoryuken | |
class Processor | ||
include Util | ||
|
||
def initialize(manager) | ||
@manager = manager | ||
BATCH_LIMIT = 10 | ||
|
||
def initialize(fetcher, polling_strategy) | ||
@fetcher = fetcher | ||
@polling_strategy = polling_strategy | ||
# what is the worst thing that could happen because it's not atomic? | ||
# is it possible that it evaluates to true even if we never set it to true? | ||
@done = false | ||
end | ||
|
||
def start | ||
# does updating and evaluating @thread has to be wrapped in mutex? | ||
# can it throw exception? | ||
# what should we do if it dies? | ||
@thread ||= spawn_thread(&method(:loop)) | ||
end | ||
|
||
def terminate(wait=false) | ||
@done = true | ||
# does updating and evaluating @thread has to be wrapped in mutex? | ||
return if !@thread | ||
@thread.value if wait | ||
end | ||
|
||
def kill | ||
@done = true | ||
# does updating and evaluating @thread has to be wrapped in mutex? | ||
return if !@thread | ||
@thread.raise ::Shoryuken::Shutdown | ||
end | ||
|
||
def alive? | ||
# is this ok? | ||
[email protected] | ||
end | ||
|
||
private | ||
|
||
def loop | ||
while !@done | ||
do_work | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mariokostelac wouldn't this cause non-stop fetching? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the point where polling strategy kicks in. But you are right, it will melt :o There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it's not difficult to fix. We can always tell it to sleep if there are no active queues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mariokostelac actually it isn't just about active queues, it's about available thread to process, we can't fetch and wait. |
||
rescue ::Shoryuken::Shutdown | ||
# killed, bail! | ||
rescue => e | ||
# are we safe to retry here? What are kind of errors we do not want to recover from? | ||
# OOM? | ||
logger.debug { "Processor caught exception: #{e}" } | ||
retry | ||
end | ||
|
||
def do_work | ||
queue = @polling_strategy.next_queue | ||
return logger.info { 'Doing nothing, all queues are paused' } if queue.nil? | ||
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_message(queue) | ||
end | ||
|
||
def dispatch_batch(queue) | ||
batch = @fetcher.fetch(queue, BATCH_LIMIT) | ||
@polling_strategy.messages_found(queue.name, batch.size) | ||
process(queue.name, patch_batch!(batch)) | ||
end | ||
|
||
def dispatch_single_message(queue) | ||
messages = @fetcher.fetch(queue, 1) | ||
@polling_strategy.messages_found(queue.name, messages.size) | ||
process(queue.name, messages.first) if messages.length > 0 | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mariokostelac thinking more on processors fetching messages... Maybe we can start using long-pool There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mariokostelac ^ but if we go with that approach, we will need to make sure that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not possible with the current set of interfaces (all queues are paused so we get nil), but our interfaces are not written in stone. It's good that you mentioned long polling - since processors are independent, it should be totally ok to use long-polling all the time. Ish. If we have just one queue, we should use long polling for sure. Just to recap what would that approach bring us:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mariokostelac I think we can deprecated
That implementation is straightforward and working. But as you raised, it isn't as optimal as it could be, there will be always a possible 25ms delay, which isn't a huge thing for some cases, but I bet it's for others. If we need to wait, it feels like it's better to wait waiting for messages, not just for the heartbeat. But if we go with processors fetching messages, and long-pool (which feels cool indeed), I think we we will need the polling strategies to make it thread-safe, otherwise we won't be able to load balance the queues consumption, so we will move the complexity around, but we may ended with a very optimal setup. I think Shoryuken doesn't need to address all needs. We can go with a solution that's good and straightforward as possible for most users and for us to maintain. Apps that can't wait for something less than second to process are very edge case. |
||
|
||
def batched_queue?(queue) | ||
Shoryuken.worker_registry.batch_receive_messages?(queue.name) | ||
end | ||
|
||
def process(queue, sqs_msg) | ||
|
@@ -13,11 +82,13 @@ def process(queue, sqs_msg) | |
worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do | ||
worker.perform(sqs_msg, body) | ||
end | ||
|
||
@manager.processor_done(queue) | ||
end | ||
|
||
private | ||
|
||
def spawn_thread(&block) | ||
Thread.new do | ||
block.call | ||
end | ||
end | ||
|
||
def get_body(worker_class, sqs_msg) | ||
if sqs_msg.is_a? Array | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac that's where ^ I see some advantage of using concurrent-ruby/thread-pool, it abstracts some of this logic
shutdown
,kill
,wait_for_termination
etc.Also, replacing dead processors automatically. Here, we would need to re-spawn in case the thread dies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. It's not the easiest thing to get right. I have to give it another thought, but ThreadPool alone does not seem as the best solution. Futures on ThreadPool could be better, though, have to investigate that a little bit more.
Still, I think it's difficult to get the same throughput if we do not use Actor model or if we do not make Processor self-sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac I don't see why not? We have a thread-pool with the size of the configured
concurrency
and we only enqueued that size, so there won't be any idle message waiting for a thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not too opinionated whether we should use some library or not, it was just a try to see how it looks like. I like it, but it introduces handling low-level stuff.
I think meeting half-way would look like:
That way manager becomes simple, we do not share state - no concurrency problems, and we do not have to handle threads ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we have a problem where processor fails and we do not know that (@ready variable does not increment). It could be the case that we can see how many "available" processors we have by asking "how many threads available do you have?".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand the code, we poll once per second. Regardless the concurrency level, we can have a lot of time being spent waiting for next dispatch.
Let's say every job takes 20ms, concurrency set to 8.
t=0, we fetch 8 jobs, we dispatch them
t=20ms, all processors finished
t=1000ms, we fetch 8 jobs, we dispatch them
t=1020ms, all processors finished.
We actually spend 980 ms waiting for next dispatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac great call! I totally missed that. Maybe we could use
remaining_capacity
in place of@ready
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That implies that you ARE improving the system throughput with increasing concurrency settings, but never to the level where it should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac
How long it would it take to perform 8 requests to SQS to fetch a message per processor?
But anyway, I mentioned in another comment, we can remove the
heartbeat
and calldispatch
once a processor is done. So we processor whenever a processor is ready.TimeTask
says that the delay is in seconds, but maybe it supports something like this0.5
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariokostelac I think for most people, 1 second does not matter much, also it isn't exact a second, a second starts counting before the fetch. It's in background anyways. But I see your point, just saying that it's something we need to trade ~ 1 second delay or increase the number of requests.