Skip to content

Commit

Permalink
Optimise msg_store recovery in case of large message file
Browse files Browse the repository at this point in the history
Since 4.0.0 (commit d45fbc3) the shared message store writes large
messages into their own rdq files. This information can be utilised
when scanning rdq files during recovery to avoid reading in the whole
message body into memory unnecessarily.

This commit addresses the same issue that was addressed in 3.13.x by
commit baeefbe (ie. appending a large binary together from 4MB chunks
leaves a lot of garbage and memory fragmentation behind) but even more
efficiently.

Large messages which were written before 4.0.0, which don't fully fill
the rdq file, are still handled as before.
  • Loading branch information
gomoripeti committed Feb 14, 2025
1 parent c470661 commit fb21a19
Showing 1 changed file with 45 additions and 17 deletions.
62 changes: 45 additions & 17 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1515,28 +1515,38 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% a remnant from a previous compaction, but it might
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
case check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) of
{continue, NewAcc} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
MsgIdsFound#{MsgIdInt => true}, NewAcc);
try_next_byte ->
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
end
end;
%% Large message alone in its own file
scan_data(<<Size:64, MsgIdInt:128, _Rest/bits>> = Data, Fd, Fun, Offset, FileSize, _MsgIdsFound, _Acc)
when Offset == 0,
FileSize == Size + 9 ->
{ok, CurrentPos} = file:position(Fd, cur),
case file:pread(Fd, FileSize - 1, 1) of
{ok, <<255>>} ->
TotalSize = FileSize,
case check_msg(Fun, MsgIdInt, TotalSize, Offset, []) of
{continue, NewAcc} ->
NewAcc;
try_next_byte ->
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
_ ->
%% Wrong end marker
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
Expand All @@ -1545,9 +1555,27 @@ scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc).

scan_next_byte(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).

check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
{continue, [Entry|Acc]};
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
{continue, Acc};
%% Not a message, try the next byte.
invalid ->
try_next_byte
end.

%%----------------------------------------------------------------------------
%% Ets index
%%----------------------------------------------------------------------------
Expand Down

0 comments on commit fb21a19

Please sign in to comment.