-
Notifications
You must be signed in to change notification settings - Fork 330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Maxretry more flexible #194
Changes from 10 commits
e8e8700
b178bdc
df77234
b903f23
ad8e441
61f9d7e
e5442f9
d5b405c
1d5ed8e
bde75aa
40a8caf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,13 @@ def initialize(channel, queue, opts) | |
error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" | ||
requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" | ||
|
||
retry_routing_key = @opts[:retry_routing_key] || '#' | ||
requeue_routing_key = @opts[:requeue_routing_key] || '#' | ||
@error_routing_key = @opts[:error_routing_key] || '#' | ||
|
||
retry_queue_name = @opts[:retry_queue_name] || retry_name | ||
error_queue_name = @opts[:error_queue_name] || error_name | ||
|
||
# Create the exchanges | ||
@retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| | ||
Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } | ||
|
@@ -59,23 +66,25 @@ def initialize(channel, queue, opts) | |
Sneakers.logger.debug do | ||
"#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" | ||
end | ||
@retry_queue = @channel.queue(retry_name, | ||
:durable => queue_durable?, | ||
:arguments => { | ||
|
||
@retry_queue = @channel.queue(retry_queue_name, | ||
:durable => queue_durable?, | ||
:arguments => { | ||
:'x-dead-letter-exchange' => requeue_name, | ||
:'x-message-ttl' => @opts[:retry_timeout] || 60000 | ||
}) | ||
@retry_queue.bind(@retry_exchange, :routing_key => '#') | ||
:'x-message-ttl' => @opts[:retry_timeout] || 60000, | ||
:'x-dead-letter-routing-key' => requeue_routing_key | ||
}) | ||
@retry_queue.bind(@retry_exchange, :routing_key => retry_routing_key) | ||
|
||
Sneakers.logger.debug do | ||
"#{log_prefix} creating queue=#{error_name}" | ||
end | ||
@error_queue = @channel.queue(error_name, | ||
@error_queue = @channel.queue(error_queue_name, | ||
:durable => queue_durable?) | ||
@error_queue.bind(@error_exchange, :routing_key => '#') | ||
@error_queue.bind(@error_exchange, :routing_key => @error_routing_key) | ||
|
||
# Finally, bind the worker queue to our requeue exchange | ||
queue.bind(@requeue_exchange, :routing_key => '#') | ||
queue.bind(@requeue_exchange, :routing_key => requeue_routing_key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you get the messages with "world" routing key. |
||
|
||
@max_retries = @opts[:retry_max_times] || 5 | ||
|
||
|
@@ -147,7 +156,7 @@ def handle_retry(hdr, props, msg, reason) | |
end | ||
end | ||
end.to_json | ||
@error_exchange.publish(data, :routing_key => hdr.routing_key) | ||
@error_exchange.publish(data, :routing_key => @error_routing_key) | ||
@channel.acknowledge(hdr.delivery_tag, false) | ||
# TODO: metrics | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
require 'sneakers' | ||
require 'thread' | ||
require 'redis' | ||
|
||
require 'sneakers/handlers/maxretry' | ||
|
||
# This worker ... works | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I would call these workers something more along the lines of how they behave: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Done. |
||
class MaxretryWorker | ||
include Sneakers::Worker | ||
|
||
def work(_) | ||
ack! | ||
end | ||
end | ||
|
||
# This worker fails | ||
class FailingMaxretryWorker | ||
include Sneakers::Worker | ||
|
||
def work(_) | ||
reject! | ||
end | ||
end | ||
|
||
# This worker fails once | ||
class RetryOnceWorker | ||
include Sneakers::Worker | ||
|
||
def work_with_params(_, delivery_info, message_properties) | ||
if message_properties[:headers].nil? || | ||
message_properties[:headers]['x-death'].nil? | ||
reject! | ||
else | ||
dump = JSON.dump( | ||
'delivery_info' => delivery_info.to_hash, | ||
'message_properties' => message_properties.to_hash | ||
) | ||
Redis.new.set( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to use a Redis list here? Could be useful to test multiple payloads. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't need a list for that.;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, if someone needs a list, it can be changed easily. If I change it now, one could wonder why I use a list when a string is enough. |
||
self.class.queue_name, | ||
dump | ||
) | ||
ack! | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will change the current behavior I believe. My understanding of this setting is that if it is unset, the routing-key from the originally published message will be used in the retry queue. When the TTL is hit and it goes to the dead-letter exchange (which will then forward it onto the original queue sneakers is subscribing to), it will again pass along the original message's routing key. By setting the value, you're over-writing the original message's routing key, potentially causing the retried message to be different than when it first went through the queue.
I'm not an expert in this area of Rabbit, as we never don't use routing keys, but I suspect this will cause some configurations to not work. I think a test case of this would be to setup your Sneakers worker to use a queue that binds to a topic exchange with a rule to only get messages that have a routing key of "hello". Then publish a message with that routing key, have your worker fail it so it goes through this retry logic, and set the value for this requeue routing key to be "world" and see if you get it again in your worker. If you do, I think a further validation would be to verify that that message you're getting has the routing key value of "hello" and not "world". My suspicion is that you'll never see the retry and if you do, it will have a routing key value of "world".