-
-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unpause FIFO queues on worker completion #644
Unpause FIFO queues on worker completion #644
Conversation
* Add in UnpauseQueue middleware * Added specs * Added unpause method on strategies * TODO: Add specs around #unpause * Updates Processor, Manager & Options to allow new variables * TODO: Add specs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach seems solid! I took another pass after reviewing some of the surrounding context.
@@ -38,6 +38,11 @@ def active_queues | |||
.reverse | |||
end | |||
|
|||
def unpause_queue(queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For symmetry with the method messages_found
, let's consider renaming this method message_processed
.
The reason is that the interaction between the Manager
class and the polling strategy is that the manager is just informing the polling strategy of certain events, allowing the polling strategy to respond as it sees fit. In this case, it's not the responsibility of the Manager
to tell the polling strategy to pause a queue, it's just the responsibility for the Manager
to let the polling strategy know that it's done processing the message: the polling strategy is welcome to do whatever it wants with that knowledge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add message_processed
to Shoryuken::Polling::BaseStrategy
. Its implementation can be empty (for backward compatibility--users might have subclassed the base class themselves).
lib/shoryuken/manager.rb
Outdated
client_queue = Shoryuken::Client.queues(queue) | ||
return unless client_queue.fifo? | ||
|
||
@polling_strategy.unpause_queue(queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since users can specify their own polling strategy class, we want to make it optional to implement the new method. Let's invoke this method only if @polling_strategy
responds to it. Otherwise, this change would be backwards-incompatible.
Thanks for being so responsive to feedback! I'll do some testing on some real queues, but overall, this is looking great! |
I tested this locally on some FIFO queues. On On this branch, I observed that the fetcher resumed whenever a job is finished, improving throughput, especially when the number of distinct Excellent work, @davidrichey! |
closes #643