Skip to content

Commit

Permalink
See #7389. Only one tick process per QQ
Browse files Browse the repository at this point in the history
(cherry picked from commit 9363648)
  • Loading branch information
SimonUnge authored and mergify[bot] committed May 23, 2023
1 parent 625553e commit 9517d30
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 34 deletions.
21 changes: 18 additions & 3 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,7 @@ tick(Ts, #?MODULE{cfg = #cfg{name = _Name,
true ->
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
false ->
[{mod_call, rabbit_quorum_queue,
handle_tick, [QName, overview(State), all_nodes(State)]}]
[{aux, {handle_tick, [QName, overview(State), all_nodes(State)]}}]
end.

-spec overview(state()) -> map().
Expand Down Expand Up @@ -979,7 +978,7 @@ which_module(3) -> ?MODULE.
last_decorators_state :: term(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{},
unused,
tick_pid,
unused2}).

init_aux(Name) when is_atom(Name) ->
Expand Down Expand Up @@ -1029,6 +1028,18 @@ handle_aux(leader, cast, {#return{msg_ids = MsgIds,
_ ->
{no_reply, Aux0, Log0}
end;
handle_aux(leader, _, {handle_tick, Args},
#?AUX{tick_pid = Pid} = Aux, Log, _) ->
NewPid =
case process_is_alive(Pid) of
false ->
%% No active TICK pid
spawn(rabbit_quorum_queue, handle_tick, Args);
true ->
%% Active TICK pid, do nothing
Pid
end,
{no_reply, Aux#?AUX{tick_pid = NewPid}, Log};
handle_aux(_, _, {get_checked_out, ConsumerId, MsgIds},
Aux0, Log0, #?MODULE{cfg = #cfg{},
consumers = Consumers}) ->
Expand Down Expand Up @@ -1151,6 +1162,10 @@ force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
AuxState
end.

process_is_alive(Pid) when is_pid(Pid) ->
is_process_alive(Pid);
process_is_alive(_) ->
false.
%%% Queries

query_messages_ready(State) ->
Expand Down
59 changes: 28 additions & 31 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -608,18 +608,18 @@ tick_test(C) ->
{S3, {_, _}} = deq(C, 4, Cid2, unsettled, Msg2, S2),
{S4, _, _} = apply(meta(C, 5), rabbit_fifo:make_return(Cid, [MsgId]), S3),

[{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{},
#{config := #{name := ?FUNCTION_NAME},
num_consumers := 1,
num_checked_out := 1,
num_ready_messages := 1,
num_messages := 2,
enqueue_message_bytes := 3,
checkout_message_bytes := 3,
num_discarded := _Discards},
[_Node]
]}] = rabbit_fifo:tick(1, S4),
[{aux, {handle_tick,
[#resource{},
#{config := #{name := ?FUNCTION_NAME},
num_consumers := 1,
num_checked_out := 1,
num_ready_messages := 1,
num_messages := 2,
enqueue_message_bytes := 3,
checkout_message_bytes := 3,
num_discarded := _Discards},
[_Node]
]}}] = rabbit_fifo:tick(1, S4),
ok.


Expand Down Expand Up @@ -1562,10 +1562,10 @@ purge_nodes_test(C) ->
{down, EnqPid, noconnection},
State3),
?assertMatch(
[{mod_call, rabbit_quorum_queue, handle_tick,
[{aux, {handle_tick,
[#resource{}, _Metrics,
[ThisNode, Node]
]}] , rabbit_fifo:tick(1, State4)),
]}}] , rabbit_fifo:tick(1, State4)),
%% assert there are both enqueuers and consumers
{State, _, _} = apply(meta(C, 5),
rabbit_fifo:make_purge_nodes([Node]),
Expand All @@ -1578,10 +1578,10 @@ purge_nodes_test(C) ->
?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
State),
?assertMatch(
[{mod_call, rabbit_quorum_queue, handle_tick,
[{aux, {handle_tick,
[#resource{}, _Metrics,
[ThisNode]
]}] , rabbit_fifo:tick(1, State)),
]}}] , rabbit_fifo:tick(1, State)),
ok.

meta(Config, Idx) ->
Expand Down Expand Up @@ -1765,21 +1765,21 @@ queue_ttl_test(C) ->
expires => 1000},
S0 = rabbit_fifo:init(Conf),
Now = 1500,
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S0),
%% this should delete the queue
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 1000, S0),
%% adding a consumer should not ever trigger deletion
Cid = {<<"cid1">>, self()},
{S1, _} = check_auto(C, Cid, 1, S0),
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S1),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1),
%% cancelling the consumer should then
{S2, _, _} = apply(meta(C, 2, Now),
rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
%% last_active should have been reset when consumer was cancelled
%% last_active = 2500
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 2500, S2),
Expand All @@ -1789,7 +1789,7 @@ queue_ttl_test(C) ->
{down, self(), noconnection}, S1),
%% last_active should have been reset when consumer was cancelled
%% last_active = 2500
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2D),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 2500, S2D),
Expand All @@ -1800,7 +1800,7 @@ queue_ttl_test(C) ->
rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
S0),

[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1Deq),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 2500, S1Deq),
Expand All @@ -1818,11 +1818,10 @@ queue_ttl_test(C) ->
{wrap_reply, {dequeue, {MsgId, _}, _}}}] = Fun2([Msg]),
{E3, _, _} = apply(meta(C, 3, Now + 1000),
rabbit_fifo:make_settle(Deq, [MsgId]), E2),
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1500, E3),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 3000, E3),

ok.

queue_ttl_with_single_active_consumer_test(C) ->
Expand All @@ -1834,35 +1833,33 @@ queue_ttl_with_single_active_consumer_test(C) ->
single_active_consumer_on => true},
S0 = rabbit_fifo:init(Conf),
Now = 1500,
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S0),
%% this should delete the queue
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 1000, S0),
%% adding a consumer should not ever trigger deletion
Cid = {<<"cid1">>, self()},
{S1, _} = check_auto(C, Cid, 1, S0),
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S1),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1),
%% cancelling the consumer should then
{S2, _, _} = apply(meta(C, 2, Now),
rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
%% last_active should have been reset when consumer was cancelled
%% last_active = 2500
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 2500, S2),

%% Same for downs
{S2D, _, _} = apply(meta(C, 2, Now),
{down, self(), noconnection}, S1),
%% last_active should have been reset when consumer was cancelled
%% last_active = 2500
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2D),
%% but now it should be deleted
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
= rabbit_fifo:tick(Now + 2500, S2D),

ok.

query_peek_test(C) ->
Expand Down

0 comments on commit 9517d30

Please sign in to comment.