Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maxretry more flexible #194

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions lib/sneakers/handlers/maxretry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def initialize(channel, queue, opts)
error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"

retry_routing_key = @opts[:retry_routing_key] || '#'
requeue_routing_key = @opts[:requeue_routing_key] || '#'
@error_routing_key = @opts[:error_routing_key] || '#'

retry_queue_name = @opts[:retry_queue_name] || retry_name
error_queue_name = @opts[:error_queue_name] || error_name

# Create the exchanges
@retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" }
Expand All @@ -59,23 +66,25 @@ def initialize(channel, queue, opts)
Sneakers.logger.debug do
"#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
end
@retry_queue = @channel.queue(retry_name,
:durable => queue_durable?,
:arguments => {

@retry_queue = @channel.queue(retry_queue_name,
:durable => queue_durable?,
:arguments => {
:'x-dead-letter-exchange' => requeue_name,
:'x-message-ttl' => @opts[:retry_timeout] || 60000
})
@retry_queue.bind(@retry_exchange, :routing_key => '#')
:'x-message-ttl' => @opts[:retry_timeout] || 60000,
:'x-dead-letter-routing-key' => requeue_routing_key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change the current behavior I believe. My understanding of this setting is that if it is unset, the routing-key from the originally published message will be used in the retry queue. When the TTL is hit and it goes to the dead-letter exchange (which will then forward it onto the original queue sneakers is subscribing to), it will again pass along the original message's routing key. By setting the value, you're over-writing the original message's routing key, potentially causing the retried message to be different than when it first went through the queue.

I'm not an expert in this area of Rabbit, as we never don't use routing keys, but I suspect this will cause some configurations to not work. I think a test case of this would be to setup your Sneakers worker to use a queue that binds to a topic exchange with a rule to only get messages that have a routing key of "hello". Then publish a message with that routing key, have your worker fail it so it goes through this retry logic, and set the value for this requeue routing key to be "world" and see if you get it again in your worker. If you do, I think a further validation would be to verify that that message you're getting has the routing key value of "hello" and not "world". My suspicion is that you'll never see the retry and if you do, it will have a routing key value of "world".

})
@retry_queue.bind(@retry_exchange, :routing_key => retry_routing_key)

Sneakers.logger.debug do
"#{log_prefix} creating queue=#{error_name}"
end
@error_queue = @channel.queue(error_name,
@error_queue = @channel.queue(error_queue_name,
:durable => queue_durable?)
@error_queue.bind(@error_exchange, :routing_key => '#')
@error_queue.bind(@error_exchange, :routing_key => @error_routing_key)

# Finally, bind the worker queue to our requeue exchange
queue.bind(@requeue_exchange, :routing_key => '#')
queue.bind(@requeue_exchange, :routing_key => requeue_routing_key)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you get the messages with "world" routing key.


@max_retries = @opts[:retry_max_times] || 5

Expand Down Expand Up @@ -147,7 +156,7 @@ def handle_retry(hdr, props, msg, reason)
end
end
end.to_json
@error_exchange.publish(data, :routing_key => hdr.routing_key)
@error_exchange.publish(data, :routing_key => @error_routing_key)
@channel.acknowledge(hdr.delivery_tag, false)
# TODO: metrics
end
Expand Down
45 changes: 45 additions & 0 deletions spec/fixtures/maxretry_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
require 'sneakers'
require 'thread'
require 'redis'

require 'sneakers/handlers/maxretry'

# This worker ... works
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would call these workers something more along the lines of how they behave: AlwaysAckWorker, AlwaysRejectWorker, RejectOnceWorker (which is what you have). It may make them more portable across other tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

class AlwaysAckWorker
include Sneakers::Worker

def work(_)
ack!
end
end

# This worker fails
class AlwaysRejectWorker
include Sneakers::Worker

def work(_)
reject!
end
end

# This worker fails once
class RejectOnceWorker
include Sneakers::Worker

def work_with_params(_, delivery_info, message_properties)
if message_properties[:headers].nil? ||
message_properties[:headers]['x-death'].nil?
reject!
else
dump = JSON.dump(
'delivery_info' => delivery_info.to_hash,
'message_properties' => message_properties.to_hash
)
Redis.new.set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use a Redis list here? Could be useful to test multiple payloads.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't need a list for that.;)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if someone needs a list, it can be changed easily. If I change it now, one could wonder why I use a list when a string is enough.

self.class.queue_name,
dump
)
ack!
end
end
end
Loading