Skip to content

Commit

Permalink
Handle timeouts in transient queue deletion
Browse files Browse the repository at this point in the history
Transient queue deletion previously caused a crash if Khepri was enabled
and a node with a transient queue went down while its cluster was in a
minority. We need to handle the `{error,timeout}` return possible from
`rabbit_db_queue:delete_transient/1`. In the
`rabbit_amqqueue:on_node_down/1` callback we log a warning when we see
this return.

We then try this deletion again during that node's
`rabbit_khepri:init/0` which is called from a boot step after
`rabbit_khepri:setup/0`. At that point we can return an error and halt
the node's boot if the command times out. The cluster is very likely to
be in a majority at that point since `rabbit_khepri:setup/0` waits for
a leader to be elected (requiring a majority).

This fixes a crash report found in the `cluster_minority_SUITE`'s
`end_per_group`.

(cherry picked from commit 3f734ef)
  • Loading branch information
the-mikedavis authored and mergify[bot] committed Aug 13, 2024
1 parent 0f90906 commit 006f517
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
33 changes: 30 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
-export([queue/1, queue_names/1]).

-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
-export([delete_transient_queues_on_node/1]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
Expand Down Expand Up @@ -1839,13 +1840,39 @@ on_node_up(_Node) ->
-spec on_node_down(node()) -> 'ok'.

on_node_down(Node) ->
case delete_transient_queues_on_node(Node) of
ok ->
ok;
{error, timeout} ->
%% This case is possible when running Khepri. The node going down
%% could leave the cluster in a minority so the command to delete
%% the transient queue records would fail. Also see
%% `rabbit_khepri:init/0': we also try this deletion when the node
%% restarts - a time that the cluster is very likely to have a
%% majority - to ensure these records are deleted.
rabbit_log:warning("transient queues for node '~ts' could not be "
"deleted because of a timeout. These queues "
"will be removed when node '~ts' restarts or "
"is removed from the cluster.", [Node, Node]),
ok
end.

-spec delete_transient_queues_on_node(Node) -> Ret when
Node :: node(),
Ret :: ok | rabbit_khepri:timeout_error().

delete_transient_queues_on_node(Node) ->
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
case Ret of
ok -> ok;
{QueueNames, Deletions} ->
ok ->
ok;
{error, timeout} = Err ->
Err;
{QueueNames, Deletions} when is_list(QueueNames) ->
case length(QueueNames) of
0 -> ok;
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
N -> rabbit_log:info("~b transient queues from node '~ts' "
"deleted in ~fs",
[N, Node, Time / 1_000_000])
end,
notify_queue_binding_deletions(Deletions),
Expand Down
11 changes: 9 additions & 2 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ wait_for_register_projections(Timeout, Retries) ->
%% @private

-spec init() -> Ret when
Ret :: ok.
Ret :: ok | timeout_error().

init() ->
case members() of
Expand All @@ -336,7 +336,14 @@ init() ->
?LOG_NOTICE(
"Found the following metadata store members: ~p", [Members],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok
%% Delete transient queues on init.
%% Note that we also do this in the
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
%% deletion during init because the cluster may have been in a
%% minority when this node went down. We wait for a majority while
%% booting (via `rabbit_khepri:setup/0') though so this deletion is
%% likely to succeed.
rabbit_amqqueue:delete_transient_queues_on_node(node())
end.

%% @private
Expand Down

0 comments on commit 006f517

Please sign in to comment.