Skip to content

Commit

Permalink
Fix dead letter cycle detection
Browse files Browse the repository at this point in the history
The desired correct detection behaviour is:
> If there is a death with a source queue that is the same as the target
> queue name and there are no newer deaths with the 'rejected' reason then
> consider this a cycle.

This commit implements this desired behaviour.
Previously, we relied on the arbitrary order of keys returned from maps:keys/1
which was wrong.
  • Loading branch information
ansd committed Apr 19, 2024
1 parent d32a2a8 commit 46e1058
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 5 deletions.
12 changes: 10 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,16 @@ record_death(Reason, SourceQueue, BasicMsg) ->


-spec is_death_cycle(rabbit_misc:resource_name(), state()) -> boolean().
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := Deaths}}) ->
is_cycle(TargetQueue, maps:keys(Deaths#deaths.records));
is_death_cycle(TargetQueue, #?MODULE{annotations = #{deaths := #deaths{records = Recs}}}) ->
OldToNewDeaths = lists:sort(fun({_K1, #death{anns = #{last_time := T1}}},
{_K2, #death{anns = #{last_time := T2}}}) ->
T1 =< T2
end, maps:to_list(Recs)),
OldToNewDeathKeys = lists:map(fun({Key, _Death}) ->
Key
end, OldToNewDeaths),
NewToOldDeathKeys = lists:reverse(OldToNewDeathKeys),
is_cycle(TargetQueue, NewToOldDeathKeys);
is_death_cycle(_TargetQueue, #?MODULE{}) ->
false;
is_death_cycle(TargetQueue, BasicMsg) ->
Expand Down
61 changes: 60 additions & 1 deletion deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ groups() ->
dead_letter_nack_requeue,
dead_letter_nack_requeue_multiple,
dead_letter_reject,
dead_letter_reject_expire_expire,
dead_letter_reject_many,
dead_letter_reject_requeue,
dead_letter_max_length_drop_head,
Expand Down Expand Up @@ -185,7 +186,7 @@ init_per_testcase(Testcase, Config) ->
{skip, "dead_letter_headers_should_not_be_appended_for_republish isn't mixed versions compatible"};
_ ->
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~tp", [Group, Testcase])),
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
Q3 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_3", [Group, Testcase])),
Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])),
Expand Down Expand Up @@ -377,6 +378,64 @@ dead_letter_reject(Config) ->
consume_empty(Ch, QName),
?assertEqual(1, counted(messages_dead_lettered_rejected_total, Config)).

dead_letter_reject_expire_expire(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
%% In 3.13.0, and 3.13.1 there is a bug in mc:is_death_cycle/2 where the queue names matter.
%% The following queue names triggered the bug because they affect the order returned by maps:keys/1.
Q1 = <<"b">>,
Q2 = <<"a2">>,
Q3 = <<"a3">>,
Args = ?config(queue_args, Config),
Durable = ?config(queue_durable, Config),

%% Test the followig topology message flow:
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
%% Q1

#'queue.declare_ok'{} = amqp_channel:call(
Ch,
#'queue.declare'{
queue = Q1,
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, Q2}],
durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(
Ch,
#'queue.declare'{
queue = Q2,
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, Q3},
{<<"x-message-ttl">>, long, 5}],
durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(
Ch,
#'queue.declare'{
queue = Q3,
arguments = Args ++ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, Q1},
{<<"x-message-ttl">>, long, 5}],
durable = Durable}),

%% Send a single message.
P = <<"msg">>,
publish(Ch, Q1, [P]),
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]),

%% Reject the 1st time.
[DTag1] = consume(Ch, Q1, [P]),
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag1,
requeue = false}),
%% Message should now flow from Q1 -> Q2 -> Q3 -> Q1
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]),

%% Reject the 2nd time.
[DTag2] = consume(Ch, Q1, [P]),
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2,
requeue = false}),
%% Message should again flow from Q1 -> Q2 -> Q3 -> Q1
wait_for_messages(Config, [[Q1, <<"1">>, <<"1">>, <<"0">>]]).

%% 1) Many messages are rejected. They get dead-lettered in correct order.
dead_letter_reject_many(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Expand Down
77 changes: 75 additions & 2 deletions deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ all_tests() ->
amqpl_table_x_header,
amqpl_table_x_header_array_of_tbls,
amqpl_death_records,
is_death_cycle,
amqpl_amqp_bin_amqpl,
amqpl_cc_amqp_bin_amqpl,
amqp_amqpl_amqp_uuid_correlation_id,
Expand Down Expand Up @@ -222,19 +223,91 @@ amqpl_death_records(_Config) ->
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),


%% second dead letter, e.g. a ttl reason returning to source queue
%% second dead letter, e.g. an expired reason returning to source queue

%% record_death uses a timestamp for death record ordering, ensure
%% it is definitely higher than the last timestamp taken
timer:sleep(2),
Msg2 = mc:record_death(ttl, <<"dl">>, Msg1),
Msg2 = mc:record_death(expired, <<"dl">>, Msg1),

#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
ok.

is_death_cycle(_Config) ->
Content = #content{class_id = 60,
properties = #'P_basic'{headers = []},
payload_fragments_rev = [<<"data">>]},
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),

%% Test the followig topology:
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
%% Q1 --rejected--> Q2 --expired--> Q3

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg1),
"A queue that dead letters to itself due to rejected is not considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)),

timer:sleep(3),
Msg2 = mc:record_death(expired, <<"q2">>, Msg1),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)),
?assert(mc:is_death_cycle(<<"q2">>, Msg2),
"A queue that dead letters to itself due to expired is considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)),

timer:sleep(3),
Msg3 = mc:record_death(expired, <<"q3">>, Msg2),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)),
?assert(mc:is_death_cycle(<<"q2">>, Msg3)),
?assert(mc:is_death_cycle(<<"q3">>, Msg3)),

timer:sleep(3),
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)),

timer:sleep(3),
Msg5 = mc:record_death(expired, <<"q2">>, Msg4),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)),
?assert(mc:is_death_cycle(<<"q2">>, Msg5)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),

?assertEqual([<<"q1">>, <<"q2">>, <<"q3">>],
lists:sort(mc:death_queue_names(Msg5))),
?assertMatch({{<<"q2">>, expired},
#death{exchange = <<"exch">>,
routing_keys = [<<"apple">>],
count = 2,
anns = #{first_time := FirstTime,
last_time := LastTime}}}
when FirstTime < LastTime, mc:last_death(Msg5)),

#content{properties = #'P_basic'{headers = H}} = mc:protocol_state(Msg5),
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H)),
?assertMatch({_, longstr, <<"q2">>}, header(<<"x-last-death-queue">>, H)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H)),
?assertMatch({_, longstr, <<"expired">>}, header(<<"x-last-death-reason">>, H)),

%% We expect the array to be ordered by recency.
{_, array, [{table, T1}, {table, T2}, {table, T3}]} = header(<<"x-death">>, H),

?assertMatch({_, longstr, <<"q2">>}, header(<<"queue">>, T1)),
?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T1)),
?assertMatch({_, long, 2}, header(<<"count">>, T1)),

?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T2)),
?assertMatch({_, long, 2}, header(<<"count">>, T2)),

?assertMatch({_, longstr, <<"q3">>}, header(<<"queue">>, T3)),
?assertMatch({_, longstr, <<"expired">>}, header(<<"reason">>, T3)),
?assertMatch({_, long, 1}, header(<<"count">>, T3)).

header(K, H) ->
rabbit_basic:header(K, H).

Expand Down

0 comments on commit 46e1058

Please sign in to comment.