Skip to content

Commit

Permalink
Recovery terms in per-queue files instead of DETS
Browse files Browse the repository at this point in the history
Per-vhost DETS file with recovery terms for all queues is a bottleneck
when stopping RabbitMQ - all queues try save their state, leading
to a very long file server mailbox and very unpredictable time
to stop RabbitMQ (on my machine it can vary from 20 seconds to 5 minutes
with 100k classic queues).

In this PR we can still read the recovery terms from DETS but we only
save them in per-queue files. This way each queue can quickly store its
state. Under the same condition, my machine can consistently stop
RabbitMQ in 15 seconds or so.

The tradeoff is a slower startup time: on my machine, it goes up from
29 seconds to 38 seconds, but that's still better than what we had until
#7676 was merged a few
days ago. More importantly, the total of stop+start is lower and more
predictable.

This PR also improves shutdown with many classic queues v1.
Startup time with 100k CQv1s is so long and unpredictable that it's hard
to even tell if this PR affects it (it varies from 4 to 8 minutes for me).

Unfortunately this PR makes startup on MacOS slower (~55s instead of 30s
for me), but we don't have to optimise for that. In most cases (with
much fewer queues), it won't be noticable anyway.
  • Loading branch information
mkuratczyk committed Mar 23, 2023
1 parent 2b1a80f commit 9d93d05
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 40 deletions.
20 changes: 5 additions & 15 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,10 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->

ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) ->
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
ok = write_file_and_ensure_dir(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
"QUEUE: ", QName/binary, "\n",
"INDEX: v2\n">>).
ok = rabbit_file:write_file_and_ensure_dir(QueueNameFile,
<<"VHOST: ", VHost/binary, "\n",
"QUEUE: ", QName/binary, "\n",
"INDEX: v2\n">>).

-spec reset_state(State) -> State when State::state().

Expand Down Expand Up @@ -542,7 +543,7 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
filename:basename(rabbit_file:binary_to_filename(Dir)),
rabbit_file:binary_to_filename(Dir),
[{v2_index_state, {?VERSION, Segments}} | Terms]),
State#qi{ segments = #{},
fds = #{} }.
Expand Down Expand Up @@ -1291,14 +1292,3 @@ highest_continuous_seq_id([SeqId1, SeqId2|Tail], EndSeqId)
highest_continuous_seq_id([SeqId2|Tail], EndSeqId);
highest_continuous_seq_id([SeqId|Tail], _) ->
{SeqId, Tail}.

write_file_and_ensure_dir(Name, IOData) ->
case file:write_file(Name, IOData, [raw]) of
ok -> ok;
{error, enoent} ->
case filelib:ensure_dir(Name) of
ok -> file:write_file(Name, IOData, [raw]);
Err -> Err
end;
Err -> Err
end.
16 changes: 16 additions & 0 deletions deps/rabbit/src/rabbit_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-export([read_file_info/1]).
-export([filename_as_a_directory/1]).
-export([filename_to_binary/1, binary_to_filename/1]).
-export([write_file_and_ensure_dir/2]).

-import(file_handle_cache, [with_handle/1, with_handle/2]).

Expand Down Expand Up @@ -349,3 +350,18 @@ binary_to_filename(Bin) when is_binary(Bin) ->
Other ->
erlang:error(Other)
end.

%% Try to write a file and if it fails, ensure_dir and try again.
%% This is an optimisation since ensuring dir takes time and often
%% we can assume the folder exists already.
-spec write_file_and_ensure_dir(file:filename(), iodata()) -> ok_or_error().
write_file_and_ensure_dir(Name, IOData) ->
case file:write_file(Name, IOData, [raw]) of
ok -> ok;
{error, enoent} ->
case filelib:ensure_dir(Name) of
ok -> file:write_file(Name, IOData, [raw]);
Err -> Err
end;
Err -> Err
end.
42 changes: 22 additions & 20 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,7 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,

terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
_ = rabbit_recovery_terms:store(VHost, filename:basename(Dir),
[{segments, SegmentCounts} | Terms]),
_ = rabbit_recovery_terms:store(VHost, Dir, [{segments, SegmentCounts} | Terms]),
State1.

-spec delete_and_terminate(qistate()) -> qistate().
Expand Down Expand Up @@ -537,25 +536,28 @@ bounds(State = #qistate { segments = Segments }) ->

start(VHost, DurableQueueNames) ->
ok = rabbit_recovery_terms:start(VHost),
QueuesFolder = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]),
{DurableTerms, DurableDirectories} =
lists:foldl(
fun(QName, {RecoveryTerms, ValidDirectories}) ->
DirName = queue_name_to_dir_name(QName),
RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
{error, _} -> non_clean_shutdown;
{ok, Terms} -> Terms
end,
{[RecoveryInfo | RecoveryTerms],
sets:add_element(DirName, ValidDirectories)}
end, {[], sets:new()}, DurableQueueNames),
lists:foldl(
fun(QName, {RecoveryTerms, ValidDirectories}) ->
DirName = queue_name_to_dir_name(QName),
QueueDir = filename:join([QueuesFolder, DirName]),
RecoveryInfo = case rabbit_recovery_terms:read(VHost, QueueDir) of
{ok, Terms} -> Terms;
{error, _} -> non_clean_shutdown
end,
{[RecoveryInfo | RecoveryTerms],
sets:add_element(DirName, ValidDirectories)}
end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir])
|| Dir <- lists:subtract(all_queue_directory_names(VHost),
sets:to_list(DurableDirectories))],
rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
_ = rabbit_file:recursive_delete(ToDelete),

rabbit_recovery_terms:clear(VHost),
_ = case [filename:join([QueuesFolder, Dir])
|| Dir <- lists:subtract(all_queue_directory_names(VHost),
sets:to_list(DurableDirectories))] of
[] -> ok;
ToDelete ->
rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
_ = rabbit_file:recursive_delete(ToDelete)
end,

%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
Expand Down Expand Up @@ -796,7 +798,7 @@ recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal
SegmentAndDirtyCount;
recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
%% force to flush the segment
{add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
{add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
Expand Down
17 changes: 14 additions & 3 deletions deps/rabbit/src/rabbit_recovery_terms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,23 @@ stop(VHost) ->

-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).

store(VHost, DirBaseName, Terms) ->
dets:insert(VHost, {DirBaseName, Terms}).
store(_VHost, QueueDir, Terms) ->
RecoveryFile = filename:join([QueueDir, ".recovery"]),
_ = rabbit_file:write_file_and_ensure_dir(RecoveryFile, term_to_iovec(Terms)).

-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).

read(VHost, DirBaseName) ->
read(VHost, QueueDir) ->
RecoveryFile = filename:join([rabbit_vhost:msg_store_dir_path(VHost), QueueDir, ".recovery"]),
case file:read_file(RecoveryFile) of
{ok, TermsBin} ->
_ = prim_file:delete(RecoveryFile),
{ok, binary_to_term(TermsBin)};
{error, _} ->
read_legacy(VHost, QueueDir)
end.

read_legacy(VHost, DirBaseName) ->
case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ recover(VHost) ->
rabbit_log:info("Making sure data directory '~ts' for vhost '~ts' exists",
[VHostDir, VHost]),
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = filelib:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
ok = ensure_config_file(VHost),
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ variable_queue_read_terms(QName) ->
virtual_host = VHost,
name = Name } = QName,
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
DirName = rabbit_misc:format("~.36B", [Num]),
DirName = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", rabbit_misc:format("~.36B", [Num])]),
{ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
Terms.

Expand Down

0 comments on commit 9d93d05

Please sign in to comment.