Skip to content

Commit

Permalink
Merge pull request #7688 from rabbitmq/mergify/bp/v3.12.x/pr-7676
Browse files Browse the repository at this point in the history
Faster node start with many classic queues v2 (backport #7676)
  • Loading branch information
michaelklishin authored Mar 21, 2023
2 parents 4162110 + 0b9da18 commit 24169c2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
14 changes: 12 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->

ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) ->
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
ok = filelib:ensure_dir(QueueNameFile),
ok = file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
ok = write_file_and_ensure_dir(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
"QUEUE: ", QName/binary, "\n",
"INDEX: v2\n">>).

Expand Down Expand Up @@ -1292,3 +1291,14 @@ highest_continuous_seq_id([SeqId1, SeqId2|Tail], EndSeqId)
highest_continuous_seq_id([SeqId2|Tail], EndSeqId);
highest_continuous_seq_id([SeqId|Tail], _) ->
{SeqId, Tail}.

write_file_and_ensure_dir(Name, IOData) ->
case file:write_file(Name, IOData, [raw]) of
ok -> ok;
{error, enoent} ->
case filelib:ensure_dir(Name) of
ok -> file:write_file(Name, IOData, [raw]);
Err -> Err
end;
Err -> Err
end.
19 changes: 13 additions & 6 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,12 @@ start(VHost, DurableQueueNames) ->
sets:add_element(DirName, ValidDirectories)}
end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
_ = rabbit_file:recursive_delete(
[DirName ||
DirName <- all_queue_directory_names(VHost),
not sets:is_element(filename:basename(DirName), DurableDirectories)]),
ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir])
|| Dir <- lists:subtract(all_queue_directory_names(VHost),
sets:to_list(DurableDirectories))],
rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
_ = rabbit_file:recursive_delete(ToDelete),

rabbit_recovery_terms:clear(VHost),

%% The backing queue interface requires that the queue recovery terms
Expand All @@ -564,8 +566,13 @@ start(VHost, DurableQueueNames) ->
stop(VHost) -> rabbit_recovery_terms:stop(VHost).

all_queue_directory_names(VHost) ->
filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
"queues", "*"])).
VHostQueuesPath = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]),
case filelib:is_dir(VHostQueuesPath) of
true ->
{ok, Dirs} = file:list_dir(VHostQueuesPath),
Dirs;
false -> []
end.

%%----------------------------------------------------------------------------
%% startup and shutdown
Expand Down

0 comments on commit 24169c2

Please sign in to comment.