Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CQ: Make CQ shared store compaction fast #10696

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
Expand Down Expand Up @@ -444,7 +443,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
Expand Down Expand Up @@ -732,7 +730,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mirror_queue_slave.erl",
"src/rabbit_mirror_queue_sync.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_file.erl",
"src/rabbit_msg_record.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
Expand Down
81 changes: 0 additions & 81 deletions deps/rabbit/src/rabbit_msg_file.erl

This file was deleted.

141 changes: 100 additions & 41 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,13 @@ handle_info(sync, State) ->
handle_info(timeout, State) ->
noreply(internal_sync(State));

handle_info({timeout, TimerRef, {maybe_gc, Candidates}}, State = #msstate{ gc_check_timer = TimerRef }) ->
handle_info({timeout, TimerRef, {maybe_gc, Candidates0}},
State = #msstate{ gc_candidates = NewCandidates,
gc_check_timer = TimerRef }) ->
%% We do not want to consider candidates for GC that had
%% a message removed since we sent that maybe_gc message.
%% In that case we simply defer the GC to the next maybe_gc.
Candidates = maps:without(maps:keys(NewCandidates), Candidates0),
noreply(maybe_gc(Candidates, State));

%% @todo When a CQ crashes the message store does not remove
Expand Down Expand Up @@ -1357,12 +1363,6 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
writer_close(#writer{fd = Fd}) ->
file:close(Fd).

open_file(File, Mode) ->
file_handle_cache:open_with_absolute_path(
File, ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
{read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).

mark_handle_open(FileHandlesEts, File, Ref) ->
%% This is fine to fail (already exists). Note it could fail with
%% the value being close, and not have it updated to open.
Expand All @@ -1382,6 +1382,80 @@ list_sorted_filenames(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
filelib:wildcard("*" ++ Ext, Dir)).

%%----------------------------------------------------------------------------
%% file scanning
%%----------------------------------------------------------------------------

-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB

scan_file_for_valid_messages(Dir, FileName) ->
scan_file_for_valid_messages(form_filename(Dir, FileName)).

scan_file_for_valid_messages(Path) ->
case file:open(Path, [read, binary, raw]) of
{ok, Fd} ->
{ok, FileSize} = file:position(Fd, eof),
{ok, _} = file:position(Fd, bof),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
ok = file:close(Fd),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end;
{error, enoent} ->
{ok, [], 0};
{error, Reason} ->
{error, {unable_to_scan_file,
filename:basename(Path),
Reason}}
end.

scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
eof ->
Acc;
{ok, Data0} ->
Data = case Buffer of
<<>> -> Data0;
_ -> <<Buffer/binary, Data0/binary>>
end,
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
end.

%% Message might have been found.
scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
Fd, Offset, FileSize, MsgIdsFound, Acc)
when Size >= 16 ->
<<MsgIdInt:128, _/bits>> = MsgIdAndMsg,
case MsgIdsFound of
%% This MsgId was found already. This data is probably
%% a remnant from a previous compaction, but it might
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true},
[{MsgId, TotalSize, Offset}|Acc])
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).

%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -1522,25 +1596,6 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.

scan_file_for_valid_messages(File) ->
case open_file(File, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(File),
fun scan_fun/2, []),
ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file,
filename:basename(File),
Reason}}
end.

scan_file_for_valid_messages(Dir, FileName) ->
scan_file_for_valid_messages(form_filename(Dir, FileName)).

scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
[{MsgId, TotalSize, Offset} | Acc].

build_index(true, _StartupFunState,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
File = ets:last(FileSummaryEts),
Expand Down Expand Up @@ -1907,21 +1962,25 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
ok
end.

load_and_vacuum_message_file(File, State) ->
Messages0 = index_select_all_from_file(File, State),
%% Cleanup messages that have 0 ref_count.
Messages = lists:foldl(fun
(Entry = #msg_location{ ref_count = 0 }, Acc) ->
ok = index_delete_object(Entry, State),
Acc;
(Entry, Acc) ->
[Entry|Acc]
end, [], Messages0),
lists:keysort(#msg_location.offset, Messages).

index_select_all_from_file(File, #gc_state { index_module = Index,
index_state = State }) ->
Index:select_all_from_file(File, State).
load_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc) ->
case index_lookup(MsgId, State) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = index_delete_object(Entry, State),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
[ Entry | Acc ];
_ ->
Acc
end
end, [], Messages).

scan_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
Expand Down
7 changes: 1 addition & 6 deletions deps/rabbit/src/rabbit_msg_store_ets_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-behaviour(rabbit_msg_store_index).

-export([new/1, recover/1,
lookup/2, select_from_file/3, select_all_from_file/2, insert/2, update/2, update_fields/3, delete/2,
lookup/2, select_from_file/3, insert/2, update/2, update_fields/3, delete/2,
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).

-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
Expand Down Expand Up @@ -49,11 +49,6 @@ select_from_file(MsgIds, File, State) ->
All = [lookup(Id, State) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].

%% Note that this function is not terribly efficient and should only be
%% used for compaction or similar.
select_all_from_file(File, State) ->
ets:match_object(State #state.table, #msg_location { file = File, _ = '_' }).

insert(Obj, State) ->
true = ets:insert_new(State #state.table, Obj),
ok.
Expand Down
20 changes: 2 additions & 18 deletions deps/rabbit/src/rabbit_msg_store_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@

-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).

-export([set_maximum_since_use/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).
terminate/2, code_change/3]).

-record(state,
{ pending,
Expand Down Expand Up @@ -53,23 +51,13 @@ delete(Server, File) ->
stop(Server) ->
gen_server2:call(Server, stop, infinity).

-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.

set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).

%%----------------------------------------------------------------------------

init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #state { pending = #{},
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.

prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast(_Msg, _Len, _State) -> 0.

handle_call(stop, _From, State) ->
{stop, normal, ok, State}.

Expand All @@ -94,11 +82,7 @@ handle_cast({truncate, File, TruncateSize, ThresholdTimestamp}, State = #state{p
handle_cast({delete, File}, State = #state{pending = Pending}) ->
%% We drop any pending action because deletion takes precedence over truncation.
State1 = State#state{pending = maps:remove(File, Pending)},
{noreply, attempt_action(delete, [File], State1), hibernate};

handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
{noreply, State, hibernate}.
{noreply, attempt_action(delete, [File], State1), hibernate}.

%% Run all pending actions.
handle_info({timeout, TimerRef, do_pending},
Expand Down
Loading
Loading