Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QQ: resend pending commands when new leader detected on applied notif… #13095

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -586,34 +586,58 @@ update_machine_state(Server, Conf) ->
ra_server_proc:ra_event_body(), state()) ->
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
handle_ra_event(QName, From, {applied, Seqs},
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
handle_ra_event(QName, Leader, {applied, Seqs},
#state{leader = OldLeader,
cfg = #cfg{soft_limit = SftLmt}} = State0) ->

{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
{[], [], State0#state{leader = From}},
{[], [], State0#state{leader = Leader}},
Seqs),

%% if the leader has changed we need to resend any pending commands remaining
%% after the applied processing
State2 = if OldLeader =/= Leader ->
%% double check before resending as applied notifications
%% can arrive from old leaders in any order
case ra:members(Leader) of
{ok, _, ActualLeader}
when ActualLeader =/= OldLeader ->
%% there is a new leader
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
"from ~w to ~w, "
"resending ~b pending commands",
[?MODULE, OldLeader, ActualLeader,
maps:size(State1#state.pending)]),
resend_all_pending(State1#state{leader = ActualLeader});
_ ->
State1
end;
true ->
State1
end,

Actions0 = lists:reverse(ActionsRev),
Actions = case Corrs of
[] ->
Actions0;
_ ->
%%TODO consider using lists:foldr/3 above because
%%TODO: consider using lists:foldr/3 above because
%% Corrs is returned in the wrong order here.
%% The wrong order does not matter much because the channel sorts the
%% sequence numbers before confirming to the client. But rabbit_fifo_client
%% is sequence numer agnostic: it handles any correlation terms.
[{settled, QName, Corrs} | Actions0]
end,
case map_size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
case map_size(State2#state.pending) < SftLmt of
true when State2#state.slow == true ->
% we have exited soft limit state
% send any unsent commands and cancel the time as
% TODO: really the timer should only be cancelled when the channel
% exits flow state (which depends on the state of all queues the
% channel is interacting with)
% but the fact the queue has just applied suggests
% it's ok to cancel here anyway
State2 = cancel_timer(State1#state{slow = false,
State3 = cancel_timer(State2#state{slow = false,
unsent_commands = #{}}),
% build up a list of commands to issue
Commands = maps:fold(
Expand All @@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
add_command(Cid, return, Returns,
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
ServerId = pick_server(State2),
end, [], State2#state.unsent_commands),
ServerId = pick_server(State3),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
send_command(ServerId, undefined, C,
normal, S0)
end, State2, Commands),
end, State3, Commands),
{ok, State, [{unblock, cluster_name(State)} | Actions]};
_ ->
{ok, State1, Actions}
{ok, State2, Actions}
end;
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(QName, From, Del, State0);
Expand Down
61 changes: 61 additions & 0 deletions deps/rabbit/test/rabbit_fifo_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ all_tests() ->
[
basics,
return,
lost_return_is_resent_on_applied_after_leader_change,
rabbit_fifo_returns_correlation,
resends_lost_command,
returns,
Expand Down Expand Up @@ -56,9 +57,11 @@ init_per_group(_, Config) ->
PrivDir = ?config(priv_dir, Config),
_ = application:load(ra),
ok = application:set_env(ra, data_dir, PrivDir),
application:ensure_all_started(logger),
application:ensure_all_started(ra),
application:ensure_all_started(lg),
SysCfg = ra_system:default_config(),
ra_env:configure_logger(logger),
ra_system:start(SysCfg#{name => ?RA_SYSTEM}),
Config.

Expand All @@ -67,6 +70,7 @@ end_per_group(_, Config) ->
Config.

init_per_testcase(TestCase, Config) ->
ok = logger:set_primary_config(level, all),
meck:new(rabbit_quorum_queue, [passthrough]),
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
Expand Down Expand Up @@ -162,6 +166,63 @@ return(Config) ->
rabbit_quorum_queue:stop_server(ServerId),
ok.

lost_return_is_resent_on_applied_after_leader_change(Config) ->
%% this test handles a case where a combination of a lost/overwritten
%% command and a leader change could result in a client never detecting
%% a new leader and thus never resends whatever command was overwritten
%% in the prior term. The fix is to handle leader changes when processing
%% the {appliekd, _} ra event.
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
ServerId2 = ?config(node_id2, Config),
ServerId3 = ?config(node_id3, Config),
Members = [ServerId, ServerId2, ServerId3],

ok = meck:new(ra, [passthrough]),
ok = start_cluster(ClusterName, Members),

{ok, _, Leader} = ra:members(ServerId),
Followers = lists:delete(Leader, Members),

F00 = rabbit_fifo_client:init(Members),
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
F1 = F0,
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
{ok, _, {_, _, MsgId, _, _}, F3} =
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
RaEvt = receive
{ra_event, Leader, {applied, _} = Evt} ->
Evt
after 5000 ->
ct:fail("no ra event")
end,
NextLeader = hd(Followers),
timer:sleep(100),
ok = ra:transfer_leadership(Leader, NextLeader),
%% get rid of leader change event
receive
{ra_event, _, {machine, leader_change}} ->
ok
after 5000 ->
ct:fail("no machine leader_change event")
end,
%% client will "send" to the old leader
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
meck:unload(ra),
%% pass the ra event with the new leader as if the entry was applied
%% by the new leader, not the old
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
RaEvt, F5),
%% this should resend the never applied enqueue
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),

flush(),
ok.

rabbit_fifo_returns_correlation(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
Expand Down
Loading