Skip to content

Commit

Permalink
Chunk quorum queue deliveries
Browse files Browse the repository at this point in the history
This puts a limit to the amount of message data that is added
to the process heap at the same time to around 128KB.

Large prefetch values combined with large messages could cause
excessive garbage collection work.

Also similify the intermediate delivery message format to avoid
allocations that aren't necessary.
  • Loading branch information
kjnilsson committed Feb 6, 2023
1 parent 55e3569 commit b9bfe0a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
42 changes: 33 additions & 9 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1876,9 +1876,9 @@ checkout(#{index := Index} = Meta,
end.

checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
Expand Down Expand Up @@ -1942,12 +1942,34 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.

append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->

%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes,
[CurChunk | Chunks])
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[], [Msg | CurChunk] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).

append_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).

take_next_msg(#?MODULE{returns = Returns0,
Expand Down Expand Up @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.

delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, [Idx | Header]}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).

-record(consumer_cfg,
{meta = #{} :: consumer_meta(),
Expand Down

0 comments on commit b9bfe0a

Please sign in to comment.